NewPipe/app/src/main/java/org/schabi/newpipe/player/playback/MediaSourceManager.java

325 lines
11 KiB
Java

package org.schabi.newpipe.player.playback;
import android.support.annotation.Nullable;
import android.util.Log;
import com.google.android.exoplayer2.source.DynamicConcatenatingMediaSource;
import com.google.android.exoplayer2.source.MediaSource;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.schabi.newpipe.extractor.stream.StreamInfo;
import org.schabi.newpipe.player.mediasource.DeferredMediaSource;
import org.schabi.newpipe.playlist.PlayQueue;
import org.schabi.newpipe.playlist.PlayQueueItem;
import org.schabi.newpipe.playlist.events.PlayQueueMessage;
import org.schabi.newpipe.playlist.events.RemoveEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.SerialDisposable;
import io.reactivex.functions.Consumer;
public class MediaSourceManager implements DeferredMediaSource.Callback {
private final String TAG = "MediaSourceManager@" + Integer.toHexString(hashCode());
// One-side rolling window size for default loading
// Effectively loads WINDOW_SIZE * 2 + 1 streams, should be at least 1 to ensure gapless playback
// todo: inject this parameter, allow user settings perhaps
private static final int WINDOW_SIZE = 2;
private PlaybackListener playbackListener;
private PlayQueue playQueue;
private DynamicConcatenatingMediaSource sources;
// sourceToQueueIndex maps media source index to play queue index
// Invariant 1: this list is sorted in ascending order
// Invariant 2: this list contains no duplicates
private List<Integer> sourceToQueueIndex;
private Subscription playQueueReactor;
private SerialDisposable syncReactor;
private boolean isBlocked;
public MediaSourceManager(@NonNull final PlaybackListener listener,
@NonNull final PlayQueue playQueue) {
this.playbackListener = listener;
this.playQueue = playQueue;
this.syncReactor = new SerialDisposable();
this.sources = new DynamicConcatenatingMediaSource();
this.sourceToQueueIndex = Collections.synchronizedList(new ArrayList<Integer>());
playQueue.getBroadcastReceiver()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getReactor());
}
/*//////////////////////////////////////////////////////////////////////////
// DeferredMediaSource listener
//////////////////////////////////////////////////////////////////////////*/
@Override
public MediaSource sourceOf(StreamInfo info) {
return playbackListener.sourceOf(info);
}
/*//////////////////////////////////////////////////////////////////////////
// Exposed Methods
//////////////////////////////////////////////////////////////////////////*/
/**
* Returns the media source index of the currently playing stream.
* */
public int getCurrentSourceIndex() {
return sourceToQueueIndex.indexOf(playQueue.getIndex());
}
/**
* Returns the play queue index of a given media source playlist index.
* */
public int getQueueIndexOf(final int sourceIndex) {
if (sourceIndex < 0 || sourceIndex >= sourceToQueueIndex.size()) return -1;
return sourceToQueueIndex.get(sourceIndex);
}
/**
* Dispose the manager and releases all message buses and loaders.
* */
public void dispose() {
if (playQueueReactor != null) playQueueReactor.cancel();
if (syncReactor != null) syncReactor.dispose();
if (sources != null) sources.releaseSource();
if (sourceToQueueIndex != null) sourceToQueueIndex.clear();
playQueueReactor = null;
syncReactor = null;
sources = null;
sourceToQueueIndex = null;
playbackListener = null;
playQueue = null;
}
/**
* Loads the current playing stream and the streams within its WINDOW_SIZE bound.
*
* Unblocks the player once the item at the current index is loaded.
* */
public void load() {
// The current item has higher priority
final int currentIndex = playQueue.getIndex();
final PlayQueueItem currentItem = playQueue.getItem(currentIndex);
if (currentItem == null) return;
load(currentItem);
// The rest are just for seamless playback
final int leftBound = Math.max(0, currentIndex - WINDOW_SIZE);
final int rightLimit = currentIndex + WINDOW_SIZE + 1;
final int rightBound = Math.min(playQueue.size(), rightLimit);
final List<PlayQueueItem> items = new ArrayList<>(playQueue.getStreams().subList(leftBound, rightBound));
// Do a round robin
final int excess = rightLimit - playQueue.size();
if (excess >= 0) items.addAll(playQueue.getStreams().subList(0, Math.min(playQueue.size(), excess)));
for (final PlayQueueItem item: items) load(item);
}
/**
* Blocks the player and repopulate the sources.
*
* Does not ensure the player is unblocked and should be done explicitly through {@link #load() load}.
* */
public void reset() {
tryBlock();
resetSources();
populateSources();
}
/*//////////////////////////////////////////////////////////////////////////
// Event Reactor
//////////////////////////////////////////////////////////////////////////*/
private Subscriber<PlayQueueMessage> getReactor() {
return new Subscriber<PlayQueueMessage>() {
@Override
public void onSubscribe(@NonNull Subscription d) {
if (playQueueReactor != null) playQueueReactor.cancel();
playQueueReactor = d;
playQueueReactor.request(1);
}
@Override
public void onNext(@NonNull PlayQueueMessage playQueueMessage) {
onPlayQueueChanged(playQueueMessage);
}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() {}
};
}
private void onPlayQueueChanged(final PlayQueueMessage event) {
// why no pattern matching in Java =(
switch (event.type()) {
case APPEND:
populateSources();
break;
case SELECT:
if (isCurrentIndexLoaded()) {
sync();
} else {
reset();
}
break;
case REMOVE:
final RemoveEvent removeEvent = (RemoveEvent) event;
remove(removeEvent.index());
break;
case INIT:
case REORDER:
reset();
break;
case ERROR:
case MOVE:
default:
break;
}
if (!isPlayQueueReady()) {
tryBlock();
playQueue.fetch();
} else if (playQueue.isEmpty()) {
playbackListener.shutdown();
} else {
load(); // All event warrants a load
}
if (playQueueReactor != null) playQueueReactor.request(1);
}
/*//////////////////////////////////////////////////////////////////////////
// Internal Helpers
//////////////////////////////////////////////////////////////////////////*/
private boolean isPlayQueueReady() {
return playQueue.isComplete() || playQueue.size() - playQueue.getIndex() > WINDOW_SIZE;
}
private boolean isCurrentIndexLoaded() {
return getCurrentSourceIndex() != -1;
}
private boolean tryBlock() {
if (!isBlocked) {
playbackListener.block();
isBlocked = true;
return true;
}
return false;
}
private boolean tryUnblock() {
if (isPlayQueueReady() && isCurrentIndexLoaded() && isBlocked) {
isBlocked = false;
playbackListener.unblock(sources);
return true;
}
return false;
}
private void sync() {
final PlayQueueItem currentItem = playQueue.getItem();
final Consumer<StreamInfo> syncPlayback = new Consumer<StreamInfo>() {
@Override
public void accept(StreamInfo streamInfo) throws Exception {
playbackListener.sync(streamInfo);
}
};
final Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "Sync error:", throwable);
playbackListener.sync(null);
}
};
syncReactor.set(currentItem.getStream().subscribe(syncPlayback, onError));
}
private void load(@Nullable final PlayQueueItem item) {
if (item == null) return;
final int index = playQueue.indexOf(item);
if (index > sources.getSize() - 1) return;
final DeferredMediaSource mediaSource = (DeferredMediaSource) sources.getMediaSource(playQueue.indexOf(item));
if (mediaSource.state() == DeferredMediaSource.STATE_PREPARED) mediaSource.load();
if (tryUnblock()) sync();
}
private void resetSources() {
if (this.sources != null) this.sources.releaseSource();
if (this.sourceToQueueIndex != null) this.sourceToQueueIndex.clear();
this.sources = new DynamicConcatenatingMediaSource();
}
private void populateSources() {
for (final PlayQueueItem item : playQueue.getStreams()) {
insert(playQueue.indexOf(item), new DeferredMediaSource(item, this));
}
}
/*//////////////////////////////////////////////////////////////////////////
// Media Source List Manipulation
//////////////////////////////////////////////////////////////////////////*/
/**
* Inserts a source into {@link DynamicConcatenatingMediaSource} with position
* in respect to the play queue.
*
* If the play queue index already exists, then the insert is ignored.
* */
private void insert(final int queueIndex, final DeferredMediaSource source) {
if (queueIndex < 0) return;
int pos = Collections.binarySearch(sourceToQueueIndex, queueIndex);
if (pos < 0) {
final int sourceIndex = -pos-1;
sourceToQueueIndex.add(sourceIndex, queueIndex);
sources.addMediaSource(sourceIndex, source);
}
}
/**
* Removes a source from {@link DynamicConcatenatingMediaSource} with the given play queue index.
*
* If the play queue index does not exist, the removal is ignored.
* */
private void remove(final int queueIndex) {
if (queueIndex < 0) return;
final int sourceIndex = sourceToQueueIndex.indexOf(queueIndex);
if (sourceIndex == -1) return;
sourceToQueueIndex.remove(sourceIndex);
sources.removeMediaSource(sourceIndex);
// Will be slow on really large arrays, fast enough for typical use case
for (int i = sourceIndex; i < sourceToQueueIndex.size(); i++) {
sourceToQueueIndex.set(i, sourceToQueueIndex.get(i) - 1);
}
}
}