From 2683cad5b5e5c0e7a01a4349ce7c424e97ca596b Mon Sep 17 00:00:00 2001 From: inorichi Date: Thu, 5 Nov 2015 20:01:37 +0100 Subject: [PATCH] Download queue threads are now updated when the setting change --- app/build.gradle | 1 + .../data/helpers/DownloadManager.java | 13 +- .../data/helpers/PreferencesHelper.java | 30 ++- .../util/DynamicConcurrentMergeOperator.java | 186 ++++++++++++++++++ 4 files changed, 218 insertions(+), 12 deletions(-) create mode 100644 app/src/main/java/eu/kanade/mangafeed/util/DynamicConcurrentMergeOperator.java diff --git a/app/build.gradle b/app/build.gradle index 65c8eb77cf..ada376c806 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -72,6 +72,7 @@ dependencies { compile 'com.jakewharton:disklrucache:2.0.2' compile 'org.jsoup:jsoup:1.8.3' compile 'io.reactivex:rxandroid:1.0.1' + compile 'com.f2prateek.rx.preferences:rx-preferences:1.0.1' compile "com.pushtorefresh.storio:sqlite:$STORIO_VERSION" compile "com.pushtorefresh.storio:sqlite-annotations:$STORIO_VERSION" compile "info.android15.nucleus:nucleus:$NUCLEUS_VERSION" diff --git a/app/src/main/java/eu/kanade/mangafeed/data/helpers/DownloadManager.java b/app/src/main/java/eu/kanade/mangafeed/data/helpers/DownloadManager.java index 3d378c8e2f..8a7400e735 100644 --- a/app/src/main/java/eu/kanade/mangafeed/data/helpers/DownloadManager.java +++ b/app/src/main/java/eu/kanade/mangafeed/data/helpers/DownloadManager.java @@ -22,15 +22,18 @@ import eu.kanade.mangafeed.data.models.Page; import eu.kanade.mangafeed.events.DownloadChapterEvent; import eu.kanade.mangafeed.sources.base.Source; import eu.kanade.mangafeed.util.DiskUtils; +import eu.kanade.mangafeed.util.DynamicConcurrentMergeOperator; import rx.Observable; import rx.Subscription; import rx.schedulers.Schedulers; +import rx.subjects.BehaviorSubject; import rx.subjects.PublishSubject; public class DownloadManager { private PublishSubject downloadsSubject; private Subscription downloadSubscription; + private Subscription threadNumberSubscription; private Context context; private SourceManager sourceManager; @@ -61,14 +64,21 @@ public class DownloadManager { downloadSubscription.unsubscribe(); } + if (threadNumberSubscription != null && !threadNumberSubscription.isUnsubscribed()) + threadNumberSubscription.unsubscribe(); + downloadsSubject = PublishSubject.create(); + BehaviorSubject threads = BehaviorSubject.create(); + + threadNumberSubscription = preferences.getDownloadTheadsObs() + .subscribe(threads::onNext); // Listen for download events, add them to queue and download downloadSubscription = downloadsSubject .subscribeOn(Schedulers.io()) .filter(event -> !isChapterDownloaded(event)) .flatMap(this::prepareDownload) - .flatMap(this::downloadChapter, preferences.getDownloadThreads()) + .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threads)) .onBackpressureBuffer() .subscribe(); } @@ -117,7 +127,6 @@ public class DownloadManager { private Observable downloadChapter(Download download) { return download.source .pullPageListFromNetwork(download.chapter.url) - .subscribeOn(Schedulers.io()) // Add resulting pages to download object .doOnNext(pages -> { download.pages = pages; diff --git a/app/src/main/java/eu/kanade/mangafeed/data/helpers/PreferencesHelper.java b/app/src/main/java/eu/kanade/mangafeed/data/helpers/PreferencesHelper.java index c2b3f7ca01..44d03a8319 100644 --- a/app/src/main/java/eu/kanade/mangafeed/data/helpers/PreferencesHelper.java +++ b/app/src/main/java/eu/kanade/mangafeed/data/helpers/PreferencesHelper.java @@ -4,14 +4,18 @@ import android.content.Context; import android.content.SharedPreferences; import android.preference.PreferenceManager; +import com.f2prateek.rx.preferences.RxSharedPreferences; + import eu.kanade.mangafeed.R; import eu.kanade.mangafeed.sources.base.Source; import eu.kanade.mangafeed.util.DiskUtils; +import rx.Observable; public class PreferencesHelper { - private static SharedPreferences mPref; private Context context; + private SharedPreferences prefs; + private RxSharedPreferences rxPrefs; private static final String SOURCE_ACCOUNT_USERNAME = "pref_source_username_"; private static final String SOURCE_ACCOUNT_PASSWORD = "pref_source_password_"; @@ -20,7 +24,8 @@ public class PreferencesHelper { this.context = context; PreferenceManager.setDefaultValues(context, R.xml.pref_reader, false); - mPref = PreferenceManager.getDefaultSharedPreferences(context); + prefs = PreferenceManager.getDefaultSharedPreferences(context); + rxPrefs = RxSharedPreferences.create(prefs); } private String getKey(int keyResource) { @@ -28,39 +33,44 @@ public class PreferencesHelper { } public void clear() { - mPref.edit().clear().apply(); + prefs.edit().clear().apply(); } public boolean useFullscreenSet() { - return mPref.getBoolean(getKey(R.string.pref_fullscreen_key), false); + return prefs.getBoolean(getKey(R.string.pref_fullscreen_key), false); } public int getDefaultViewer() { - return Integer.parseInt(mPref.getString(getKey(R.string.pref_default_viewer_key), "1")); + return Integer.parseInt(prefs.getString(getKey(R.string.pref_default_viewer_key), "1")); } public String getSourceUsername(Source source) { - return mPref.getString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), ""); + return prefs.getString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), ""); } public String getSourcePassword(Source source) { - return mPref.getString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), ""); + return prefs.getString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), ""); } public void setSourceCredentials(Source source, String username, String password) { - mPref.edit() + prefs.edit() .putString(SOURCE_ACCOUNT_USERNAME + source.getSourceId(), username) .putString(SOURCE_ACCOUNT_PASSWORD + source.getSourceId(), password) .apply(); } public String getDownloadsDirectory() { - return mPref.getString(getKey(R.string.pref_download_directory_key), + return prefs.getString(getKey(R.string.pref_download_directory_key), DiskUtils.getStorageDirectories(context)[0]); } public int getDownloadThreads() { - return Integer.parseInt(mPref.getString(getKey(R.string.pref_download_threads_key), "1")); + return Integer.parseInt(prefs.getString(getKey(R.string.pref_download_threads_key), "1")); + } + + public Observable getDownloadTheadsObs() { + return rxPrefs.getString(getKey(R.string.pref_download_threads_key), "1") + .asObservable().map(Integer::parseInt); } } diff --git a/app/src/main/java/eu/kanade/mangafeed/util/DynamicConcurrentMergeOperator.java b/app/src/main/java/eu/kanade/mangafeed/util/DynamicConcurrentMergeOperator.java new file mode 100644 index 0000000000..821c0c6c6e --- /dev/null +++ b/app/src/main/java/eu/kanade/mangafeed/util/DynamicConcurrentMergeOperator.java @@ -0,0 +1,186 @@ +package eu.kanade.mangafeed.util; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func1; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +public class DynamicConcurrentMergeOperator implements Operator { + final Func1> mapper; + final Observable workerCount; + + public DynamicConcurrentMergeOperator( + Func1> mapper, + Observable workerCount) { + this.mapper = mapper; + this.workerCount = workerCount; + } + + @Override + public Subscriber call(Subscriber t) { + DynamicConcurrentMerge parent = new DynamicConcurrentMerge<>(t, mapper); + t.add(parent); + parent.init(workerCount); + + return parent; + } + + static final class DynamicConcurrentMerge extends Subscriber { + final Subscriber actual; + final Func1> mapper; + final Queue queue; + final CopyOnWriteArrayList> workers; + final CompositeSubscription composite; + final AtomicInteger wipActive; + final AtomicBoolean once; + long id; + + public DynamicConcurrentMerge(Subscriber actual, + Func1> mapper) { + this.actual = actual; + this.mapper = mapper; + this.queue = new ConcurrentLinkedQueue<>(); + this.workers = new CopyOnWriteArrayList<>(); + this.composite = new CompositeSubscription(); + this.wipActive = new AtomicInteger(1); + this.once = new AtomicBoolean(); + this.add(composite); + this.request(0); + } + + public void init(Observable workerCount) { + Subscription wc = workerCount.subscribe(n -> { + int n0 = workers.size(); + if (n0 < n) { + for (int i = n0; i < n; i++) { + DynamicWorker dw = new DynamicWorker<>(++id, this); + workers.add(dw); + request(1); + dw.tryNext(); + } + } else if (n0 > n) { + for (int i = 0; i < n; i++) { + workers.get(i).start(); + } + + for (int i = n0 - 1; i >= n; i--) { + workers.get(i).stop(); + } + } + + if (!once.get() && once.compareAndSet(false, true)) { + request(n); + } + }, this::onError); + + composite.add(wc); + } + + void requestMore(long n) { + request(n); + } + + @Override + public void onNext(T t) { + queue.offer(t); + wipActive.getAndIncrement(); + for (DynamicWorker w : workers) { + w.tryNext(); + } + } + + @Override + public void onError(Throwable e) { + composite.unsubscribe(); + actual.onError(e); + } + + @Override + public void onCompleted() { + if (wipActive.decrementAndGet() == 0) { + actual.onCompleted(); + } + } + } + + static final class DynamicWorker { + final long id; + final AtomicBoolean running; + final DynamicConcurrentMerge parent; + final AtomicBoolean stop; + + public DynamicWorker(long id, DynamicConcurrentMerge parent) { + this.id = id; + this.parent = parent; + this.stop = new AtomicBoolean(); + this.running = new AtomicBoolean(); + } + + public void tryNext() { + if (!running.get() && running.compareAndSet(false, true)) { + T t; + if (stop.get()) { + parent.workers.remove(this); + return; + } + t = parent.queue.poll(); + if (t == null) { + running.set(false); + return; + } + + Observable out = parent.mapper.call(t); + + Subscriber s = new Subscriber() { + @Override + public void onNext(R t) { + parent.actual.onNext(t); + } + + @Override + public void onError(Throwable e) { + parent.onError(e); + } + + @Override + public void onCompleted() { + parent.onCompleted(); + if (parent.wipActive.get() != 0) { + running.set(false); + parent.requestMore(1); + tryNext(); + } + } + }; + + parent.composite.add(s); + s.add(Subscriptions.create(() -> parent.composite.remove(s))); + + // Unchecked assignment to avoid weird Android Studio errors + out.subscribe(s); + } + } + + public void start() { + stop.set(false); + tryNext(); + } + + public void stop() { + stop.set(true); + if (running.compareAndSet(false, true)) { + parent.workers.remove(this); + } + } + } + +} \ No newline at end of file