Kotlin 协程中使用挂起函数可以实现非阻塞地执行任务并将结果返回回来,但是只能返回一个计算结果。但是如果希望有多个计算结果返回回来,则可以使用 flow,flow有像Rxjava的各种操作符,实现各种功能,同时和协程一起使用,可以替代Rxjava和liveData,并且也没有像Rxjava上手这么难,所以学kotlin,flow是必须的。
flow简单使用:
flow{
//发送者发送数值
emit(1)
}.collect{
//接受者接受发送的数值
println(it.toString())
}
看起来和Rxjava很像,但是又简单很多吧
flow的冷流与热流
-
冷流
上面的简单使用即是冷流,即执行是惰性的,调用末端流操作符(collect 是其中之一)之前, flow{ ... } 中的代码不会执行,只有当数据被订阅的时候(执行collect),发布者才开始执行发射数据流的代码(执行flow{ ... })。当有多个订阅者的时候,每个订阅者都会收到发送者完整的流程。即订阅者和发送者都是一对一的关系。
例子如下:
image.png
我们准备3个按钮,分别对应代码如下:
//发送者代码:
var test: Flow<Int>?=null
test= flow {
for (i in 0..4) {
Log.e(TAG+"2", i.toString())
emit(i)
}
//订阅者1代码:
test?.collect{
delay(1000)
Log.e(TAG, it.toString())
}
//订阅者2代码:
test?.collect{
delay(1000)
Log.e(TAG, it.toString())
}
上面三个按钮的代码都贴上去了,其中订阅者1和订阅者2代码一样,当我们只是点发送者按钮时,flow {...} flow里面的代码块是没有执行的,然后我们再点击订阅者1按钮,这时候发送者代码才开始执行,从而发送给订阅者,连续执行
image.png
当我们再点击订阅者2按钮的时候,会发现和上面的订阅者1按钮的效果一样,所以印证了一对一的关系,每个订阅者都会收到发送者完整的流程。
- 热流
热流是共享的,有缓存的,不管订阅者是否存在,只要发送了事件就会被消费,热流和订阅者是一对多的关系,多个订阅者可以共享同一个数据流。当一个订阅者停止监听时,数据流不会自动关闭。以MutableSharedFlow举例:
val mutableSharedFlow=MutableSharedFlow<Int>()
lifecycleScope.launch {
mutableSharedFlow.collect{
Log.e("mutableSharedFlow1",it.toString())
}
}
lifecycleScope.launch {
(1..5).forEach {
Log.e("mutableSharedFlow1_before",it.toString())
mutableSharedFlow.emit(it)
Log.e("mutableSharedFlow1_after",it.toString())
}
}
lifecycleScope.launch {
delay(1000)
mutableSharedFlow.collect {
Log.e("mutableSharedFlow2",it.toString())
}
}
运行结果如下:
image.png
可以看到,第二个订阅者是没有收到发送者的数据,因为在订阅之前已经被消费了,所以收不到数据
热流的具体实现SharedFlow和StateFlow,分别对应的实现类MutableSharedFlow和是MutableStateFlow,所以我们要讲的也就是这两个类。
1. MutableSharedFlow
有缓冲区区,并可以定义缓冲区的溢出规则,可以定义给一个新的接收器发送多少数据的缓存值。
MutableSharedFlow 的参数如下:
- replay 相当于粘性数据
- extraBufferCapacity //接受的慢时候,发送的入栈
- onBufferOverflow 缓冲区溢出规则:
- SUSPEND: 挂起
- DROP_OLDEST: 移除旧的值
- DROP_LATEST: 移除新的值
replay:事件粘滞数
当我们把上面的MutableSharedFlow的replay设置为1是,即如下代码:
val mutableSharedFlow=MutableSharedFlow<Int>(replay = 1)
lifecycleScope.launch {
mutableSharedFlow.collect{
Log.e("mutableSharedFlow1",it.toString())
}
}
lifecycleScope.launch {
(1..5).forEach {
mutableSharedFlow.emit(it)
}
}
lifecycleScope.launch {
delay(1000)
mutableSharedFlow.collect {
Log.e("mutableSharedFlow2",it.toString())
}
}
运行结果如下:
image.png
可以看到,第二个订阅者收到了最后一次运行的结果5,所以replay会保留上次运行的结果,replay设置多少,他就保留最新的前多少数据。
extraBufferCapacity
缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,都先发送先。
val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2)
lifecycleScope.launch {
mutableSharedFlow.collect{
Log.e("mutableSharedFlow1",it.toString())
}
}
lifecycleScope.launch {
(1..5).forEach {
Log.e("mutableSharedFlow1_before",it.toString())
mutableSharedFlow.emit(it)
Log.e("mutableSharedFlow1_after",it.toString())
}
}
lifecycleScope.launch {
delay(1000)
mutableSharedFlow.collect {
Log.e("mutableSharedFlow2",it.toString())
}
}
运行结果如下:
image.png
相对于上面,可以看到extraBufferCapacity设置2之后,头两个会先发送而不管有没有被消费完,超过第3个之后,才开始执行,执行完之后又先发送先发送两个而不管有没有被消费。
onBufferOverflow
因为有第二个参数,所以当没有被消费完的时候,这可能导致缓存容量过多,只管发不管消费者消费能力的情况就会出现背压,所以第3个参数就是出现背压的时候要怎么处理的。
分别是 SUSPEND: 挂起,DROP_OLDEST: 移除旧的值,DROP_LATEST: 移除新的值。
SUSPEND
因为默认就是SUSPEND,所以上面的MutableSharedFlow<Int>(extraBufferCapacity = 2)
就是MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.SUSPEND)
,所以和讲extraBufferCapacity的demo是一样的。
DROP_OLDEST
移除旧的值,保留最新的,extraBufferCapacity就保留多少,代码如下:
val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)
lifecycleScope.launch {
mutableSharedFlow.collect{
Log.e("mutableSharedFlow1",it.toString())
}
}
lifecycleScope.launch {
(1..5).forEach {
Log.e("mutableSharedFlow1_before",it.toString())
mutableSharedFlow.emit(it)
Log.e("mutableSharedFlow1_after",it.toString())
}
}
lifecycleScope.launch {
delay(1000)
mutableSharedFlow.collect {
Log.e("mutableSharedFlow2",it.toString())
}
}
运行效果如下:
image.png可以看到运行5个,超出缓存容量,只保留最新的两个,这就实现了消费者消费速度小于生产者的时候的背压问题。
DROP_LATEST
移除新的值,保留最旧的,extraBufferCapacity就保留多少,代码如下:
val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)
lifecycleScope.launch {
mutableSharedFlow.collect{
Log.e("mutableSharedFlow1",it.toString())
}
}
lifecycleScope.launch {
(1..5).forEach {
Log.e("mutableSharedFlow1_before",it.toString())
mutableSharedFlow.emit(it)
Log.e("mutableSharedFlow1_after",it.toString())
}
}
lifecycleScope.launch {
delay(1000)
mutableSharedFlow.collect {
Log.e("mutableSharedFlow2",it.toString())
}
}
运行结果如下:
image.png可以看到运行5个,超出缓存容量,只保留最旧的两个。
2.MutableStateFlow
MutableStateFlow 就是reply为1的MutableSharedFlow,同时它必须要有一个初始值,此外每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。具体demo如下:
val stateFlow =MutableStateFlow(value = -1)
lifecycleScope.launch {
stateFlow.collect{
Log.e("mutableStateFlow1",it.toString())
}
}
lifecycleScope.launch {
listOf(1,2,3,4,4).forEach {
Log.e("mutableStateFlow_before",it.toString())
stateFlow.emit(it)
Log.e("mutableStateFlow_after",it.toString())
}
}
lifecycleScope.launch {
delay(1000)
stateFlow.collect {
Log.e("mutableStateFlow2",it.toString())
}
}
运行结果如下:
image.png
可以看到,只要初始值和最新值,其他的值都不会,StateFlow重点在状态,只有初始值和最新值,而不会有中间值,这对于UI的状态更合适,防止重复刷新,而SharedFlow更适合事件的处理。
背压三剑客
从上面的讲解里,我们了解了MutableSharedFlow和MutableStateFlow的背压。
那冷流要怎么实现呢,其实操作符也有背压处理的。
背压说白了就是消费者的消费速度达不到生产者的创建速度时,就会产生数据的淤积。
- collectLatest
当出现背压是,只会执行最新的数据,代码如下:
flow {
(1..5).forEach{
emit(it)
}
}.collectLatest {
Log.e("collectLatest_start",it.toString())
delay(1000)
Log.e("collectLatest_end",it.toString())
}
运行结果如下:
image.png可以看到,会结束旧的数据执行即使在执行中,而执行最新的数据
- conflate
conflate 与collectLatest不同的是,conflate会先把旧的执行完,再去执行最新的数据,保证每次执行有始有终,而不会中途中断
flow {
(1..5).forEach{
emit(it)
}
}.conflate()
.collect {
Log.e("conflate_start",it.toString())
delay(1000)
Log.e("conflate_end",it.toString())
}
运行结果:
image.png可以看到,对1数据也执行到结束才执行5,中间的数据直接过滤掉,有始有终
- buffer
首先,buffer的数据发送就不会受collect函数的影响,不用等collect执行完后才发送下一条,其二,buffer也有点像MutableSharedFlow,有两个参数,分别是capacity和onBufferOverflow,
- capacity: 缓存数量
- onBufferOverflow: 处理缓存策略
下面一个个验证,首先看发送,demo如下:
flow {
(1..5).forEach {
emit(it)
}
}.onEach {
Log.e("buffer1","$it is ready")
}.collect {
delay(1000)
Log.e("buffer2","$it is handled")
}
结果如下:
image.png
可以看到,执行流程是先发一个,执行完再发下一个,事件发送和处理是连续的,假如加上buffer()呢,
flow {
(1..5).forEach {
emit(it)
}
}.onEach {
Log.e("buffer1","$it is ready")
}.buffer()
.collect {
delay(1000)
Log.e("buffer2","$it is handled")
}
结果如下:
image.png
可以看到加了之后不管有没有执行,都先发送,然后再一个个执行,不再受collect{}影响。
其二说白了就是设置缓存数量和处理策略,即设置capacity和onBufferOverflow,和上面的MutableSharedFlow有点像,举一个例子基本可以。
flow {
(1..5).forEach {
emit(it)
}
}.onEach {
Log.e("buffer1","$it is ready")
}
.buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
.collect {
delay(1000)
Log.e("buffer2","$it is handled")
}
结果如下:
image.png
设置缓存数量为1,保持处理最旧的事件,DROP_OLDEST(处理缓存最旧),其他的SUSPEND(挂起),DROP_OLDEST(处理缓存最新)
网友评论