- 日常思考,目前Kotlin协程能完全取代Rxjava吗
- Kotlin Coroutines(协程) 完全解析(二),深入
- Kotlin Coroutines(协程) 完全解析(三),封装
- Kotlin Coroutines(协程) 完全解析(四),协程
- Kotlin Coroutines(协程) 完全解析(五),协程
- 破解 Kotlin 协程(8) - Android 篇
- Android中对Kotlin Coroutines(协程)的理
- 协程Flow之FlowPermission权限请求
- 协程Flow之FlowCallAdapterFactory
- kotlin之协程(五),launch 函数以及协程的取消与超时
前言
自从jetbrains
公司提出Kotlin
协程用来解决异步线程问题,并且衍生出来了Flow
作为响应式框架,引来了大量Android开发者的青睐;而目前比较稳定的响应式库当属Rxjava
,这样以来目的就很明显了,旨在用Kotlin
协程来逐步替代掉Rxjava
;
仔细思考下,真的可以完全替代掉Rxjava
么,它的复杂性和多样化的操作符,而协程的许多API
仍然是实验性的,目前为止,随着kt不断地进行版本迭代,越来越趋于稳定,对此我不能妄下断言;当然Rxjava
无疑也是一个非常优秀的框架,值得我们不断深入思考,但是随着协程的出现,就个人而言我会更喜欢使用协程来作为满足日常开发的异步解决方案。
协程的本质和
Rxjava
是截然不同的,所以直接拿它们进行对比是比较棘手的;换一种思路,本文我们从日常开发中的异步问题出发,分别观察协程与Rxjava
是如何提供相应的解决方案,依次来进行比对,探讨下Kotlin
协程是否真的足以取代Rxjava
这个话题吧
流类型的比较
现在我们来看下Rxjava
提供的流类型有哪些,我们可以使用的基本流类型操作符如下图所示
![](https://img.haomeiwen.com/i25149744/ac387b513ba7b16c.png)
它们的基本实现在下文会提及到,这里我们简单来讨论下在协程中是怎么定义这些流操作符的
-
Single<T>
其实就是一个返回不可空值的suspend
函数 -
Maybe<T>
恰好相反,是一个返回可空的supspend
函数 -
Completable
不会发送事件,所以在协程中就是一个不返回任何东西的简单挂起函数 -
对于
Observable
和Flowable
,两者都可以发射多个事件,不同在于前者是没有背压管理的,后者才有,而他们在协程中我们可以直接使用Flow
来完成,在异步数据流中按顺序发出值,所以只需要一个返回当前Data
数据类型的Flow<T>
值得注意的是,该函数本身是不需要
supsend
修饰符的,由于Flow
是冷流,在进行收集\订阅之前是不会发射数据,只要在collect
的时候才需要协程作用域中执行。为什么说Flow
足以替代Observable
和Flowable
原因在与它处理背压(backpressure
)的方式。这自然而然来源于协程中的设计与理念,不需要一些巧妙设计的解决方案来处理显示背压,Flow
中所有Api
基本上都带有suspend
修复符,它也成为了解决背压的关键先生。其目的就是在不阻塞线程的情况下暂停调用者的执行,因此,当Flow<T>
在同一个协程中发射和收集的时候,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。
流类型比较的基本实现
好的小伙伴们,上文我们简单用协程写出Rxjava
的几个基本流类型,现在让我们用几个详细的实例来看看他们的不同之处吧
Completable ---- 异步任务完成没有结果,可能会抛出错误
在Rxjava
中,我们使用Completable.create
去创建,里面的CompletableEmitter
中有onComplete
表示完成的方法和一个onError
传递异常的方法,如下代码所示
//completable in Rxjava
fun completableRequest(): Completable {
return Completable.create { emitter->
try {
emitter.onComplete()
}catch (e:Exception) {
emitter.onError(e)
}
}
}
fun main() {
completableRequest()
.subscribe {
println("I,am done")
println()
}
}
在协程当中,我们对应的就是调用一个不返回任何内容的挂起函数(returns Unit
),就类似于我们调用一个普通函数一样
fun completableCoroutine() = runBlocking {
try {
delay(500L)
println("I am done")
} catch (e: Exception) {
println("Got an exception")
}
}
注意不要在生产环境代码使用
runBlocking
,你应该有一个合适的CoroutineScope
,由于是测试代码本文都将使用runBlocking
来辅助说明测试场景
Single ---- 必须返回或抛出错误的异步任务
在 RxJava
中,我们使用一个Single
,它里面有一个onSuccess
传递返回值的方法和一个onError
传递异常的方法。
```kotlin
/**
* Single in RxJava
*/
fun main() {
singleResult()
.subscribe(
{ result -> println(result) },
{ println("Got an exception") }
)
}
fun singleResult(): Single<String> {
return Single.create { emitter ->
try {
// process a request
emitter.onSuccess("Some result")
} catch (e: Exception) {
emitter.onError(e)
}
}
而在协程中,我们调用一个返回**非空值**的挂起函数:
/**
- Single equivalent in coroutines
*/
fun main() = runBlocking {
try {
val result = getResult()
println(result)
} catch (e: Exception) {
println("Got an exception")
}
}
suspend fun getResult(): String {
// process a request
delay(100)
return "Some result"
}
##### Maybe --- 可能返回结果或抛出错误的异步任务
在 `RxJava` 中,我们使用一个`Maybe`. 它里面有一个`onSuccess`传递返回值的方法`onComplete`,一个在没有值的情况下发出完成信号的方法,以及一个`onError`传递异常的方法。
/**
- Maybe in RxJava
*/
fun main() {
maybeResult()
.subscribe(
{ result -> println(result) },
{ println("Got an exception") },
{ println("Completed without a value!") }
)
}
fun maybeResult(): Maybe<String> {
return Maybe.create { emitter ->
try {
// process a request
if (Random.nextBoolean()) {
emitter.onSuccess("Some value")
} else {
emitter.onComplete()
}
} catch (e: Exception) {
emitter.onError(e)
}
}
}
在协程中,我们调用一个返回可空值得挂起函数
/**
- Maybe equivalent in coroutines
*/
fun main() = runBlocking {
try {
val result = getNullableResult()
if (result != null) {
println(result)
} else {
println("Completed without a value!")
}
} catch (e: Exception) {
println("Got an exception")
}
}
suspend fun getNullableResult(): String? {
// process a request
delay(100)
return if (Random.nextBoolean()) {
"Some value"
} else {
null
}
}
##### 0..N事件的异步流
由于在`Rxjava`中,`Flowable`和`Observable`都是属于`0..N`事件的异步流,但是`Observable`几乎没有做相应的背压管理,所以这里我们主要以`Flowable`为例子,`onNext`发出下一个流值的方法,一个`onComplete`表示流完成的方法,以及一个`onError`传递异常的方法。
/**
- Flowable in RxJava
*/
fun main() {
flowableValues()
.subscribe(
{ value -> println(value) },
{ println("Got an exception") },
{ println("I'm done") }
)
}
fun flowableValues(): Flowable<Int> {
val flowableEmitter = { emitter: FlowableEmitter<Int> ->
try {
for (i in 1..10) {
emitter.onNext(i)
}
} catch (e: Exception) {
emitter.onError(e)
} finally {
emitter.onComplete()
}
}
return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}
在协程中,我们只是创建一个`Flow`就可以完成这个方法
/**
- Flow in Kotlin
*/
fun main() = runBlocking {
try {
eventFlow().collect { value ->
println(value)
}
println("I'm done")
} catch (e: Exception) {
println("Got an exception")
}
}
fun eventFlow() = flow {
for (i in 1..10) {
emit(i)
}
}
> 在惯用的 `Kotlin` 中,创建上述流程的方法之一是:`fun eventFlow() = (1..10).asFlow()`
如上面这些代码所见,我们基本可以使用协程涵盖`Rxjava`所有的主要基本用法,此外,协程的设计允许我们使用所有标准的`Kotlin`功能编写典型的顺序代码 ,它还消除了对`onComplete`或`onError`回调的需要。我们可以像在普通代码中那样捕获错误或设置协程异常处理程序。并且,考虑到当挂起函数完成时,协程继续按顺序执行,我们可以在下一行继续编写我们的“完成逻辑”。
**值得注意的是,当我们进行调用`collect`收集的时候也是如此,在收集完所有元素后才会执行下一行代码**
eventFlow().collect { value ->
println(value)
}
println("I'm done")
> `Flow`收集完所有元素后,才会调用打印`I'm done`
### 操作符的比较
总所周知,`Rxjava`的主要优势在于它拥有非常多的操作符,基本上可以应对日常开发中出现的各种情况,由于它种类特别繁多又比较难记忆,这里我只简单举些常见的操作符进行比较
#### `COMPLETABLE`,`SINGLE`, `MAYBE`
这里需要强调的是,在`Rxjava`中`Completable`,`Single`和`Maybe`都有许多相同的操作符,然而在协程中任何类型的操作符其实都是多余的,我们以`Single`中的`map()`简单操作符为例来看下:
/**
- Maps Single<String> to
- Single<User> synchronously
*/
fun main() {
getUsername()
.map { username ->
User(username)
}
.subscribe(
{ user -> println(user) },
{ println("Got an exception") }
)
}
`map`作为`Rxjava`中最常用的操作符,获取一个值并将其转换为另一个值,但是在协程中我们不需要`.map()`操作符就可以实现这种操作
fun main() = runBlocking {
try {
val username = getUsername() // suspend fun
val user = User(username)
println(user)
} catch (e: Exception) {
println("Got an exception")
}
}
使用`suspend`挂起函数可以挂起当前函数,当执行完毕后在按顺序执行接下来的代码
#### `Flow`操作符与`Rxjava`操作符
现在让我们看看`Flow`中有哪些操作符,它们与`Rxjava`相比有什么不同,由于篇幅原因,这里我简单比较下日常开发中最常用的操作符
##### `map()`
对于`map`操作符,`Flow`中也具有相同的操作符
/**
- Maps Flow<String> to Flow<User>
*/
fun main() = runBlocking {
usernameFlow()
.map { username ->
User(username)
}
.collect { user ->
println(user)
}
}
`Flow`中的`map`操作符 相当于`Rxjava`做了一定的简化处理,这是它的一个主要优势,可以看下它的源码
fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
collect { value -> emit(transform(value)) }
}
是不是非常简单,只是重新创建一个新的`flow`,它从从上游收集值`transform`并在当前函数应用后发出这些值;事实上大多数`Flow`的操作符都是这样工作的,不需要遵循严格的协议;对于大多数应用场景,标准`Flow`操作符就已经足够了,当然编写自定义操作符也是非常简单容易的;相对于`Rxjava`,如果想要编写自定义操作符,你必须非常了解`Rxjava`的
[`Reactive Streams`协议](https://link.juejin.cn?target=https%3A%2F%2Fwww.reactive-streams.org%2F "https://www.reactive-streams.org/")
##### `flatmap()`
另外,在`Rxjava`中我们经常使用的操作符还有`flatmap()`,同时还有很多种变体,例如`.flatMapSingle()`,`flatMapObservable()`,`flatMapIterable()`等,简单来说,在`Rxjava`中我们如果需要对一个值进行**同步转换**,就使用`map`,进行**异步转换**的时候就需要使用`flatMap()`;对此,`Flow`进行同步或者异步转换的时候不需要不同的操作符,仅仅使用`map`就足够了,由于它们都有`supsend`挂起函数进行修饰,不用担心同步性
可以看下在`Rxjava`中的示例
fun compareFlatMap() {
getUsernames() //Flowable<String>
.flatMapSingle { username ->
getUserFromNetwork(username) // Single<User>
}
.subscribe(
{ user -> println(user) },
{ println("Got an exception") }
)
}
好的,我们使用`Flow`来转换下上述的这一段代码,只需要使用`map`就可以以任何方式进行转换值,如下代码所示:
runBlocking {
flow {
emit(User("Jacky"))
}.map {
getUserFromName(it) //suspend
}.collect {
println(it)
}
}
suspend fun getUserFromName(user: User): String {
return user.userName
}
实际上使用`Flow`中的`map`操作符,就可以**将上游流发出的值转换为新流,然后将所有流扁平化为一个**,这和`flatMap`的功能几乎可以达到同样的效果
##### `filter()`
对于`filter`操作符,我们在`Rxjava`中并没有直接的方法进行异步过滤,这需要我们自己编写代码来进行过滤判断,如下所示
fun getUsernames(): Flowable<String> {
val flowableEmitter = { emitter: FlowableEmitter<String> ->
emitter.onNext("Jacky")
}
return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}
fun isCorrectUserName(userName: String): Single<Boolean> {
return Single.create { emitter ->
runCatching {
//名字判断....
if (userName.isNotEmpty()) {
emitter.onSuccess(true)
} else {
emitter.onSuccess(false)
}
}.onFailure {
emitter.onError(it)
}
}
}
fun compareFilter() {
getUsernames()//Flowable<String>
.flatMapSingle { userName ->
isCorrectUserName(userName)
.flatMap { isCorrect ->
if (isCorrect) {
Single.just(userName)
} else {
Single.never()
}
}
}.subscribe {
println(it)
}
}
乍一看,是不是感觉有点麻烦,事实上这确实需要我们使用些小手段才能达到目的;而在`Flow`中,我们能够轻松地根据同步和异步调用过滤流
runBlocking {
userNameFlow().filter { user ->
isCorrectName(user.userName)
}.collect { user->
println(user)
}
}
suspend fun isCorrectName(userName: String): Boolean {
return userName.isNotEmpty()
}
### 结语
由于篇幅原因,`Rxjava`和协程都是一个非常庞大的思考话题,它们之间的不同比较可以永远进行下去;事实上,在`Kotlin`协程被广泛使用之前,`Rxjava`作为项目中主要的异步解决方案,以至于到现在工作上还有很多项目用着`Rxjava`, 所以即使切换到`Kotlin`协程之后,还有相当长一段时间还在用着`Rxjava`;这并不代表`Rxjava`不够好,而是协程让代码变得更易读,更易于使用;
暂时先告一段落了,事实上证明协程确实能够满足我们日常开发的主要需求,下次将会对`Rxjava`中的背压和之前所讨论的`Flow`背压问题进行比较探讨,还有非常多的东西要学,共勉!!!!
本文主要内容译至 -> [www.javaadvent.com/2021/12/are…](https://link.juejin.cn?target=https%3A%2F%2Fwww.javaadvent.com%2F2021%2F12%2Fare-kotlin-coroutines-enough-to-replace-rxjava.html "https://www.javaadvent.com/2021/12/are-kotlin-coroutines-enough-to-replace-rxjava.html")
>作者:RainyJiang
链接:https://juejin.cn/post/7175803413232844855
网友评论