Remove some unused rx/coroutine bridge code

This commit is contained in:
arkon 2021-01-23 17:18:43 -05:00
parent 0685382083
commit 660e13b701

View File

@ -48,47 +48,6 @@ suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
} }
} }
// suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
// return suspendCancellableCoroutine { continuation ->
// val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
// lateinit var sub: Subscription
// sub = self.subscribe(
// {
// continuation.resume(Unit) {
// sub.unsubscribe()
// }
// },
// {
// if (!continuation.isCancelled) {
// continuation.resumeWithException(it)
// }
// }
// )
//
// continuation.invokeOnCancellation {
// sub.unsubscribe()
// }
// }
// }
//
// suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
// subscribe(
// object : CompletableSubscriber {
// override fun onSubscribe(s: Subscription) {
// cont.unsubscribeOnCancellation(s)
// }
//
// override fun onCompleted() {
// cont.resume(Unit)
// }
//
// override fun onError(e: Throwable) {
// cont.resumeWithException(e)
// }
// }
// )
// }
suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont -> suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation( cont.unsubscribeOnCancellation(
subscribe( subscribe(
@ -105,28 +64,8 @@ suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
) )
} }
// suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
//
// suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
// firstOrDefault(default).awaitOne()
//
// suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
//
// suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(
// Observable.fromCallable(
// defaultValue
// )
// ).first().awaitOne()
//
// suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne() suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
// suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
// singleOrDefault(default).awaitOne()
//
// suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont -> private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
cont.unsubscribeOnCancellation( cont.unsubscribeOnCancellation(
subscribe( subscribe(
@ -184,32 +123,6 @@ fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
awaitClose { subscription.unsubscribe() } awaitClose { subscription.unsubscribe() }
} }
// fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
// return Observable.create(
// { emitter ->
// /*
// * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
// * asObservable is already invoked from unconfined
// */
// val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
// try {
// collect { emitter.onNext(it) }
// emitter.onCompleted()
// } catch (e: Throwable) {
// // Ignore `CancellationException` as error, since it indicates "normal cancellation"
// if (e !is CancellationException) {
// emitter.onError(e)
// } else {
// emitter.onCompleted()
// }
// }
// }
// emitter.setCancellation { job.cancel() }
// },
// backpressureMode
// )
// }
fun <T> runAsObservable( fun <T> runAsObservable(
block: suspend () -> T, block: suspend () -> T,
backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE