NewPipe/app/src/main/java/org/schabi/newpipe/local/subscription/services/SubscriptionsImportService....

314 lines
12 KiB
Java

/*
* Copyright 2018 Mauricio Colli <mauriciocolli@outlook.com>
* SubscriptionsImportService.java is part of NewPipe
*
* License: GPL-3.0+
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.schabi.newpipe.local.subscription.services;
import static org.schabi.newpipe.MainActivity.DEBUG;
import static org.schabi.newpipe.streams.io.StoredFileHelper.DEFAULT_MIME;
import android.content.Intent;
import android.net.Uri;
import android.text.TextUtils;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.localbroadcastmanager.content.LocalBroadcastManager;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.schabi.newpipe.App;
import org.schabi.newpipe.R;
import org.schabi.newpipe.database.subscription.SubscriptionEntity;
import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.channel.ChannelInfo;
import org.schabi.newpipe.extractor.subscription.SubscriptionItem;
import org.schabi.newpipe.ktx.ExceptionUtils;
import org.schabi.newpipe.streams.io.SharpInputStream;
import org.schabi.newpipe.streams.io.StoredFileHelper;
import org.schabi.newpipe.util.Constants;
import org.schabi.newpipe.util.ExtractorHelper;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Notification;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class SubscriptionsImportService extends BaseImportExportService {
public static final int CHANNEL_URL_MODE = 0;
public static final int INPUT_STREAM_MODE = 1;
public static final int PREVIOUS_EXPORT_MODE = 2;
public static final String KEY_MODE = "key_mode";
public static final String KEY_VALUE = "key_value";
/**
* A {@link LocalBroadcastManager local broadcast} will be made with this action
* when the import is successfully completed.
*/
public static final String IMPORT_COMPLETE_ACTION = App.PACKAGE_NAME + ".local.subscription"
+ ".services.SubscriptionsImportService.IMPORT_COMPLETE";
/**
* How many extractions running in parallel.
*/
public static final int PARALLEL_EXTRACTIONS = 8;
/**
* Number of items to buffer to mass-insert in the subscriptions table,
* this leads to a better performance as we can then use db transactions.
*/
public static final int BUFFER_COUNT_BEFORE_INSERT = 50;
private Subscription subscription;
private int currentMode;
private int currentServiceId;
@Nullable
private String channelUrl;
@Nullable
private InputStream inputStream;
@Nullable
private String inputStreamType;
@Override
public int onStartCommand(final Intent intent, final int flags, final int startId) {
if (intent == null || subscription != null) {
return START_NOT_STICKY;
}
currentMode = intent.getIntExtra(KEY_MODE, -1);
currentServiceId = intent.getIntExtra(Constants.KEY_SERVICE_ID, Constants.NO_SERVICE_ID);
if (currentMode == CHANNEL_URL_MODE) {
channelUrl = intent.getStringExtra(KEY_VALUE);
} else {
final Uri uri = intent.getParcelableExtra(KEY_VALUE);
if (uri == null) {
stopAndReportError(new IllegalStateException(
"Importing from input stream, but file path is null"),
"Importing subscriptions");
return START_NOT_STICKY;
}
try {
final StoredFileHelper fileHelper = new StoredFileHelper(this, uri, DEFAULT_MIME);
inputStream = new SharpInputStream(fileHelper.getStream());
inputStreamType = fileHelper.getType();
if (inputStreamType == null || inputStreamType.equals(DEFAULT_MIME)) {
// mime type could not be determined, just take file extension
final String name = fileHelper.getName();
final int pointIndex = name.lastIndexOf('.');
if (pointIndex == -1 || pointIndex >= name.length() - 1) {
inputStreamType = DEFAULT_MIME; // no extension, will fail in the extractor
} else {
inputStreamType = name.substring(pointIndex + 1);
}
}
} catch (final IOException e) {
handleError(e);
return START_NOT_STICKY;
}
}
if (currentMode == -1 || currentMode == CHANNEL_URL_MODE && channelUrl == null) {
final String errorDescription = "Some important field is null or in illegal state: "
+ "currentMode=[" + currentMode + "], "
+ "channelUrl=[" + channelUrl + "], "
+ "inputStream=[" + inputStream + "]";
stopAndReportError(new IllegalStateException(errorDescription),
"Importing subscriptions");
return START_NOT_STICKY;
}
startImport();
return START_NOT_STICKY;
}
@Override
protected int getNotificationId() {
return 4568;
}
@Override
public int getTitle() {
return R.string.import_ongoing;
}
@Override
protected void disposeAll() {
super.disposeAll();
if (subscription != null) {
subscription.cancel();
}
}
/*//////////////////////////////////////////////////////////////////////////
// Imports
//////////////////////////////////////////////////////////////////////////*/
private void startImport() {
showToast(R.string.import_ongoing);
Flowable<List<SubscriptionItem>> flowable = null;
switch (currentMode) {
case CHANNEL_URL_MODE:
flowable = importFromChannelUrl();
break;
case INPUT_STREAM_MODE:
flowable = importFromInputStream();
break;
case PREVIOUS_EXPORT_MODE:
flowable = importFromPreviousExport();
break;
}
if (flowable == null) {
final String message = "Flowable given by \"importFrom\" is null "
+ "(current mode: " + currentMode + ")";
stopAndReportError(new IllegalStateException(message), "Importing subscriptions");
return;
}
flowable.doOnNext(subscriptionItems ->
eventListener.onSizeReceived(subscriptionItems.size()))
.flatMap(Flowable::fromIterable)
.parallel(PARALLEL_EXTRACTIONS)
.runOn(Schedulers.io())
.map((Function<SubscriptionItem, Notification<ChannelInfo>>) subscriptionItem -> {
try {
return Notification.createOnNext(ExtractorHelper
.getChannelInfo(subscriptionItem.getServiceId(),
subscriptionItem.getUrl(), true)
.blockingGet());
} catch (final Throwable e) {
return Notification.createOnError(e);
}
})
.sequential()
.observeOn(Schedulers.io())
.doOnNext(getNotificationsConsumer())
.buffer(BUFFER_COUNT_BEFORE_INSERT)
.map(upsertBatch())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getSubscriber());
}
private Subscriber<List<SubscriptionEntity>> getSubscriber() {
return new Subscriber<List<SubscriptionEntity>>() {
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(final List<SubscriptionEntity> successfulInserted) {
if (DEBUG) {
Log.d(TAG, "startImport() " + successfulInserted.size()
+ " items successfully inserted into the database");
}
}
@Override
public void onError(final Throwable error) {
Log.e(TAG, "Got an error!", error);
handleError(error);
}
@Override
public void onComplete() {
LocalBroadcastManager.getInstance(SubscriptionsImportService.this)
.sendBroadcast(new Intent(IMPORT_COMPLETE_ACTION));
showToast(R.string.import_complete_toast);
stopService();
}
};
}
private Consumer<Notification<ChannelInfo>> getNotificationsConsumer() {
return notification -> {
if (notification.isOnNext()) {
final String name = notification.getValue().getName();
eventListener.onItemCompleted(!TextUtils.isEmpty(name) ? name : "");
} else if (notification.isOnError()) {
final Throwable error = notification.getError();
final Throwable cause = error.getCause();
if (error instanceof IOException) {
throw error;
} else if (cause instanceof IOException) {
throw cause;
} else if (ExceptionUtils.isNetworkRelated(error)) {
throw new IOException(error);
}
eventListener.onItemCompleted("");
}
};
}
private Function<List<Notification<ChannelInfo>>, List<SubscriptionEntity>> upsertBatch() {
return notificationList -> {
final List<ChannelInfo> infoList = new ArrayList<>(notificationList.size());
for (final Notification<ChannelInfo> n : notificationList) {
if (n.isOnNext()) {
infoList.add(n.getValue());
}
}
return subscriptionManager.upsertAll(infoList);
};
}
private Flowable<List<SubscriptionItem>> importFromChannelUrl() {
return Flowable.fromCallable(() -> NewPipe.getService(currentServiceId)
.getSubscriptionExtractor()
.fromChannelUrl(channelUrl));
}
private Flowable<List<SubscriptionItem>> importFromInputStream() {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(inputStreamType);
return Flowable.fromCallable(() -> NewPipe.getService(currentServiceId)
.getSubscriptionExtractor()
.fromInputStream(inputStream, inputStreamType));
}
private Flowable<List<SubscriptionItem>> importFromPreviousExport() {
return Flowable.fromCallable(() -> ImportExportJsonHelper.readFrom(inputStream, null));
}
protected void handleError(@NonNull final Throwable error) {
super.handleError(R.string.subscriptions_import_unsuccessful, error);
}
}