RxJava转Flow及应用

作者: 支离破碎_SuperLee | 来源:发表于2023-08-23 14:38 被阅读0次

在维护老项目中,由于项目使用了rxjava,但是新代码又大多数使用Kotlin而不想使用rxjava时,尤其是使用flow想做线程同步时可以将rxjava转换为flow进行使用。直接看代码

inline fun <reified T> Observable<T>.toFlow() = callbackFlow {
    val dispose = this@toFlow
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            trySend(Result.Success(it))
        }, {
            trySend(Result.Failure(it))
        }, {
            trySend(Result.Complete())
        })
    awaitClose {
        dispose.dispose()
    }
}

使用方式

//原有的某rxjava方法
fun getToken(context: Context): Observable<String> = Observable.create {
        //do something
}
//转换后的方法可以直接得到flow
fun getTokenFlow(context: Context) = getToken(context).toFlow()
//调用方法
private fun test() {
     combine(
            flowOf(1,2,3),  getTokenFlow(requireContext())
        ) { r1, r2 ->
               Pair(r1,r2)
        }.catch {
            //do something
        }.onEach {
            val r1 = it.first
            val r2 = it.second
            //do something
        }.onCompletion {
            //do something
//这里的scope必须具备生命周期,否则rxjava将不会释放,自定义scope也需要注意生命周期问题。
        }.launchIn(lifecycleScope).start()
}

Result类采用sealed class,便于使用。

@Keep
sealed class Result<out T> {
    @Keep
    class Success<out T>(val value: T) : Result<T>()

    @Keep
    class Failure(val throwable: Throwable) : Result<Nothing>()

    @Keep
    class Complete : Result<Nothing>()
}

inline fun <reified T> Result<T>.onSuccess(block: (T) -> Unit) = apply {
    if (this is Result.Success) {
        block(value)
    }
}

inline fun <reified T> Result<T>.onFailure(block: (p: Throwable) -> Unit) = apply {
    if (this is Result.Failure) {
        block(throwable)
    }
}

inline fun <reified T> Result<T>.onCompletion(block: () -> Unit) = apply {
    if (this is Result.Complete) {
        block()
    }
}

相关文章

网友评论

    本文标题:RxJava转Flow及应用

    本文链接:https://www.haomeiwen.com/subject/fuppmdtx.html