前言
与Flow(冷流)不同,SharedFlow是热流。它可以在多个消费者之间共享数据,并且可以在任何时候发射新值。这使得它非常适合用于多个消费者需要访问相同数据的情况。不过本文并不打算深入讲解SharedFlow原理,而是从结合demo从使用上来带大家熟悉其特性。
SharedFlow 使用
最基础的生产消费模型
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
launch {
//消费者接收数据
sharedFlow.collect {
println("collect: $it")
}
}
delay(100) //确保已经订阅
//生产者发射数据
sharedFlow.emit(1)
}
输出:collect: 1
这种最简单的模式下,是看到了预期的打印。这种应该是大家都能理解的生产者-消费者模型。
消费者没有在单独的协程
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.collect {
println("collect: $it")
}
println("wait emit")
delay(100) //确保已经订阅
sharedFlow.emit(1)
}
猜一下结果?
没有打印输出
注意:这里区别是collect
没有在单独的协程调用。因为collect
是个挂起函数,会让当前协程挂起。由于生产者还没生产数据,消费者调用collect
时发现没数据后便挂起协程。所以生产者和消费者要处在不同的协程里。
生产者先发射,消费者再接收
runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.emit(1)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
结果:collect没有收到数据。
原因:先发射了数据,此时消费者还没有订阅,导致数据丢失。这也就说明了SharedFlow默认是没有粘性的。
关于”粘性“:对于新的订阅者重放其已发出的值。这意味着当一个新的订阅者被添加到一个流中时,它将接收到流先前发出的所有值,即使它们在订阅之前已经被发出。
我们大胆猜想下,要让SharedFlow具备粘性,就应该让其具有缓存机制。
历史数据的重放机制
用过livedata的人都只知道,即使先更新了数据,但每次添加了新的观察者,都能收到最新的数据,但SharedFlow默认是不具备这种能力的。但是这并不代表SharedFlow不行,而是需要一定的配置才能实现这种能力,其实SharedFlow这方面能力比livedata更强大,LiveData只能收到一个最新的值,但是SharedFlow经过配置之后是可以收到多个发射的历史数据。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
先来看下MutableSharedFlow的构造方法中的参数
- replay:重放次数。可以给订阅者发送之前已经发射的数据,而发射数据的个数就是通过replay指定的;
- extraBufferCapacity:是指Buffer中除了replay外,额外增加的缓存数量;
- onBufferOverflow:缓存区满了之后的溢出策略,有3种策略可供选择。默认
BufferOverflow.SUSPEND
,缓存溢出时挂起;另外还有2种丢弃策略,DROP_OLDEST
与DROP_LATEST
,分别是溢出时丢弃缓冲区中最旧的值和最新的值。
只配置replay
runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 1)
sharedFlow.emit(1)
launch {
sharedFlow.collect {
println("collect: $it")
}
}
}
结果:collect是收到了数据
这是replay缓存区缓存数量为1,所以后面添加的收集者可以收到历史数据。当然这个数量,你可以任意指定。
设置extraBufferCapacity
runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
replay = 2,
extraBufferCapacity = 1
)
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
launch {
sharedFlow.collect {
println("collect: $it")
delay(1000)
}
}
}
先猜下这段代码的结果是?
结果:collect: 2 collect: 3
可能很多人会很奇怪,前面说过缓存数量bufferSize是replay + extraBufferCapacity。那这段代码中bufferSize是3啊,为什么collect只有2条数据呢?我敢说,这个问题很多用ShareFlow的人都没有搞清楚(至少我在网上看到的博客是这样的)。
我先不解释,我们再来看一个例子
runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
replay = 2,
extraBufferCapacity = 1
)
sharedFlow.emit(1)
sharedFlow.emit(2)
launch {
sharedFlow.collect {
println("collect: $it")
delay(1000) //模拟处理背压
}
}
delay(200)
sharedFlow.emit(3)
sharedFlow.emit(4)
}
结果:collect能收到4个数据
在这段代码中,replay和extraBufferCapacity没有变化。区别是先发射了2条数据1,2;然后开始订阅,等订阅了之后再发射数据3,4。而前一段代码是先发射完所有数据在开始订阅。
解析:
extraBufferCapacity
是用于控制额外缓冲区的容量。额外缓冲区是一个用于存储新值的缓冲区,当重放缓冲区已满时,新值将被存储在额外缓冲区中,直到有收集器准备好收集它为止。总结下,extraBufferCapacity对应的额外缓冲区是要在有收集者订阅之后才能起作用,否则只有replay重放缓存区起作用。
emit也是个挂起函数
runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)
launch {
sharedFlow.collect {
println("collect: $it")
delay(1000)
}
}
launch {
sharedFlow.collect {
println("collect2: $it")
delay(2000)
}
}
delay(200)
for (value in 1 until 4){
println("emit value: $value")
sharedFlow.emit(value)
}
}
输出:
emit value: 1
collect: 1
collect2: 1
emit value: 2
collect: 2
collect2: 2
emit value: 3
collect: 3
collect2: 3
从打印可以看出:生产者要等待消费者消费完数据才进行下一次emit
通过replay或者extraBufferCapacity解决背压问题
SharedFlow
是一种具有背压支持的流。背压是一种流量控制机制,用于控制数据流的速率,以确保接收端能够处理数据的速度不超过其处理能力。在 SharedFlow
中,背压机制通过以下方式实现:
当缓冲区已满时, emit
函数将会被挂起,直到缓冲区中有足够的空间来接收新的值。这样可以避免生产者发送大量的数据,而消费者无法及时处理的情况,从而导致内存溢出或应用程序崩溃。
当消费者收到新值时,如果其处理速度较慢,那么生产者将会被挂起,直到消费者处理完所有的值,并释放了足够的空间来接收新的值。这样可以避免消费者被压垮,从而导致应用程序变得不可用。
因此, SharedFlow
的背压机制可以确保生产者和消费者之间的数据流量得到平衡,以避免出现数据丢失或内存泄漏等问题。这使得 SharedFlow
成为处理大量数据的可靠和高效的方案之一。
但是这样有个问题,生产者速度可能被消费者拖累。先来看一段代码:
runBlocking {
// val sharedFlow = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 0)
val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 3)
launch {
sharedFlow.collect {
println("collect: $it")
delay(1000)
}
}
launch {
sharedFlow.collect {
println("collect2: $it")
delay(2000)
}
}
delay(200)
for (value in 1 until 4){
println("emit value: $value")
sharedFlow.emit(value)
}
输出:emit value: 1
emit value: 2
emit value: 3
collect: 1
collect2: 1
collect: 2
collect2: 2
collect: 3
collect2: 3
这段代码中消费者的速度明显比生产者慢很多,但我们通过配置replay或者extraBufferCapacity来设置了缓存buffer,就可以避免消费者拖累生产者速度的问题。
网友评论