Replace RxJava in HttpPageLoader downloader (#8955)

* Convert downloader Observable to flow

Uses `runInterruptible` to turn the blocking call to `queue.take()`
into a cancellable call.

Flow collection is ended by cancelling the scope in `recycle`. This
means the `HttpPageLoader` can't be reused after calling `recycle`,
but this was true with the `Observable` as well.)

* Convert load Observables to suspending function

Inlining the Observables allows for some simplification of the error
handling. Behavior should be otherwise identical.

* Convert cleanup Completable to coroutine

Uses global `launchIO`, not ideal but similar to previous behavior.
Can't be scheduled on the local `scope` as this runs after `scope` is
cancelled.
This commit is contained in:
Two-Ai 2023-01-21 16:46:16 -05:00 committed by GitHub
parent a179327d9d
commit e4bc8990fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.source.online.HttpSource
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
import eu.kanade.tachiyomi.util.lang.plusAssign import eu.kanade.tachiyomi.util.lang.awaitSingle
import eu.kanade.tachiyomi.util.system.logcat import eu.kanade.tachiyomi.util.lang.launchIO
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import logcat.LogPriority import kotlinx.coroutines.CoroutineScope
import rx.Completable import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import rx.Observable import rx.Observable
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.SerializedSubject import rx.subjects.SerializedSubject
import rx.subscriptions.CompositeSubscription
import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get import uy.kohesive.injekt.api.get
import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.PriorityBlockingQueue
@ -31,33 +35,27 @@ class HttpPageLoader(
private val chapterCache: ChapterCache = Injekt.get(), private val chapterCache: ChapterCache = Injekt.get(),
) : PageLoader() { ) : PageLoader() {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
/** /**
* A queue used to manage requests one by one while allowing priorities. * A queue used to manage requests one by one while allowing priorities.
*/ */
private val queue = PriorityBlockingQueue<PriorityPage>() private val queue = PriorityBlockingQueue<PriorityPage>()
/**
* Current active subscriptions.
*/
private val subscriptions = CompositeSubscription()
private val preloadSize = 4 private val preloadSize = 4
init { init {
subscriptions += Observable.defer { Observable.just(queue.take().page) } scope.launchIO {
.filter { it.status == Page.State.QUEUE } flow {
.concatMap { source.fetchImageFromCacheThenNet(it) } while (true) {
.repeat() emit(runInterruptible { queue.take() }.page)
.subscribeOn(Schedulers.io()) }
.subscribe( }
{ .filter { it.status == Page.State.QUEUE }
}, .collect {
{ error -> loadPage(it)
if (error !is InterruptedException) { }
logcat(LogPriority.ERROR, error) }
}
},
)
} }
/** /**
@ -65,21 +63,23 @@ class HttpPageLoader(
*/ */
override fun recycle() { override fun recycle() {
super.recycle() super.recycle()
subscriptions.unsubscribe() scope.cancel()
queue.clear() queue.clear()
// Cache current page list progress for online chapters to allow a faster reopen // Cache current page list progress for online chapters to allow a faster reopen
val pages = chapter.pages val pages = chapter.pages
if (pages != null) { if (pages != null) {
Completable launchIO {
.fromAction { try {
// Convert to pages without reader information // Convert to pages without reader information
val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) } val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave) chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
} catch (e: Throwable) {
if (e is CancellationException) {
throw e
}
} }
.onErrorComplete() }
.subscribeOn(Schedulers.io())
.subscribe()
} }
} }
@ -192,61 +192,32 @@ class HttpPageLoader(
} }
/** /**
* Returns an observable of the page with the downloaded image. * Loads the page, retrieving the image URL and downloading the image if necessary.
* Downloaded images are stored in the chapter cache.
* *
* @param page the page whose source image has to be downloaded. * @param page the page whose source image has to be downloaded.
*/ */
private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> { private suspend fun loadPage(page: ReaderPage) {
return if (page.imageUrl.isNullOrEmpty()) { try {
getImageUrl(page).flatMap { getCachedImage(it) } if (page.imageUrl.isNullOrEmpty()) {
} else { page.status = Page.State.LOAD_PAGE
getCachedImage(page) page.imageUrl = source.fetchImageUrl(page).awaitSingle()
}
val imageUrl = page.imageUrl!!
if (!chapterCache.isImageInCache(imageUrl)) {
page.status = Page.State.DOWNLOAD_IMAGE
val imageResponse = source.fetchImage(page).awaitSingle()
chapterCache.putImageToCache(imageUrl, imageResponse)
}
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
page.status = Page.State.READY
} catch (e: Throwable) {
page.status = Page.State.ERROR
if (e is CancellationException) {
throw e
}
} }
} }
private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
page.status = Page.State.LOAD_PAGE
return fetchImageUrl(page)
.doOnError { page.status = Page.State.ERROR }
.onErrorReturn { null }
.doOnNext { page.imageUrl = it }
.map { page }
}
/**
* Returns an observable of the page that gets the image from the chapter or fallbacks to
* network and copies it to the cache calling [cacheImage].
*
* @param page the page.
*/
private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
val imageUrl = page.imageUrl ?: return Observable.just(page)
return Observable.just(page)
.flatMap {
if (!chapterCache.isImageInCache(imageUrl)) {
cacheImage(page)
} else {
Observable.just(page)
}
}
.doOnNext {
page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
page.status = Page.State.READY
}
.doOnError { page.status = Page.State.ERROR }
.onErrorReturn { page }
}
/**
* Returns an observable of the page that downloads the image to [ChapterCache].
*
* @param page the page.
*/
private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
page.status = Page.State.DOWNLOAD_IMAGE
return fetchImage(page)
.doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
.map { page }
}
} }