在维护老项目中,由于项目使用了rxjava,但是新代码又大多数使用Kotlin而不想使用rxjava时,尤其是使用flow想做线程同步时可以将rxjava转换为flow进行使用。直接看代码
inline fun <reified T> Observable<T>.toFlow() = callbackFlow {
val dispose = this@toFlow
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
trySend(Result.Success(it))
}, {
trySend(Result.Failure(it))
}, {
trySend(Result.Complete())
})
awaitClose {
dispose.dispose()
}
}
使用方式
//原有的某rxjava方法
fun getToken(context: Context): Observable<String> = Observable.create {
//do something
}
//转换后的方法可以直接得到flow
fun getTokenFlow(context: Context) = getToken(context).toFlow()
//调用方法
private fun test() {
combine(
flowOf(1,2,3), getTokenFlow(requireContext())
) { r1, r2 ->
Pair(r1,r2)
}.catch {
//do something
}.onEach {
val r1 = it.first
val r2 = it.second
//do something
}.onCompletion {
//do something
//这里的scope必须具备生命周期,否则rxjava将不会释放,自定义scope也需要注意生命周期问题。
}.launchIn(lifecycleScope).start()
}
Result类采用sealed class,便于使用。
@Keep
sealed class Result<out T> {
@Keep
class Success<out T>(val value: T) : Result<T>()
@Keep
class Failure(val throwable: Throwable) : Result<Nothing>()
@Keep
class Complete : Result<Nothing>()
}
inline fun <reified T> Result<T>.onSuccess(block: (T) -> Unit) = apply {
if (this is Result.Success) {
block(value)
}
}
inline fun <reified T> Result<T>.onFailure(block: (p: Throwable) -> Unit) = apply {
if (this is Result.Failure) {
block(throwable)
}
}
inline fun <reified T> Result<T>.onCompletion(block: () -> Unit) = apply {
if (this is Result.Complete) {
block()
}
}
网友评论