diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt index 5c7263605b..045ab8b540 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt @@ -18,9 +18,8 @@ data class Download( var pages: List? = null, ) { - @Volatile - @Transient - var totalProgress: Int = 0 + val totalProgress: Int + get() = pages?.sumOf(Page::progress) ?: 0 @Volatile @Transient diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt index 750566943f..8f5ef2a983 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/download/DownloadQueueScreenModel.kt @@ -11,19 +11,21 @@ import eu.kanade.tachiyomi.data.download.model.Download import eu.kanade.tachiyomi.databinding.DownloadListBinding import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.util.system.logcat +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import logcat.LogPriority -import rx.Observable -import rx.Subscription -import rx.android.schedulers.AndroidSchedulers import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get -import java.util.concurrent.TimeUnit class DownloadQueueScreenModel( private val downloadManager: DownloadManager = Injekt.get(), @@ -40,9 +42,9 @@ class DownloadQueueScreenModel( var adapter: DownloadAdapter? = null /** - * Map of subscriptions for active downloads. + * Map of jobs for active downloads. */ - val progressSubscriptions by lazy { mutableMapOf() } + private val progressJobs = mutableMapOf() val listener = object : DownloadAdapter.DownloadItemListener { /** @@ -130,10 +132,10 @@ class DownloadQueueScreenModel( } override fun onDispose() { - for (subscription in progressSubscriptions.values) { - subscription.unsubscribe() + for (job in progressJobs.values) { + job.cancel() } - progressSubscriptions.clear() + progressJobs.clear() adapter = null } @@ -180,16 +182,16 @@ class DownloadQueueScreenModel( fun onStatusChange(download: Download) { when (download.status) { Download.State.DOWNLOADING -> { - observeProgress(download) + launchProgressJob(download) // Initial update of the downloaded pages onUpdateDownloadedPages(download) } Download.State.DOWNLOADED -> { - unsubscribeProgress(download) + cancelProgressJob(download) onUpdateProgress(download) onUpdateDownloadedPages(download) } - Download.State.ERROR -> unsubscribeProgress(download) + Download.State.ERROR -> cancelProgressJob(download) else -> { /* unused */ } @@ -201,29 +203,25 @@ class DownloadQueueScreenModel( * * @param download the download to observe its progress. */ - private fun observeProgress(download: Download) { - val subscription = Observable.interval(50, TimeUnit.MILLISECONDS) - // Get the sum of percentages for all the pages. - .flatMap { - Observable.from(download.pages) - .map(Page::progress) - .reduce { x, y -> x + y } + private fun launchProgressJob(download: Download) { + val job = coroutineScope.launch { + while (download.pages == null) { + delay(50) } - // Keep only the latest emission to avoid backpressure. - .onBackpressureLatest() - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { progress -> - // Update the view only if the progress has changed. - if (download.totalProgress != progress) { - download.totalProgress = progress + + val progressFlows = download.pages!!.map(Page::progressFlow) + combine(progressFlows, Array::sum) + .distinctUntilChanged() + .debounce(50) + .collectLatest { onUpdateProgress(download) } - } + } - // Avoid leaking subscriptions - progressSubscriptions.remove(download)?.unsubscribe() + // Avoid leaking jobs + progressJobs.remove(download)?.cancel() - progressSubscriptions[download] = subscription + progressJobs[download] = job } /** @@ -231,8 +229,8 @@ class DownloadQueueScreenModel( * * @param download the download to unsubscribe. */ - private fun unsubscribeProgress(download: Download) { - progressSubscriptions.remove(download)?.unsubscribe() + private fun cancelProgressJob(download: Download) { + progressJobs.remove(download)?.cancel() } /**