Kotlin SharedFlow 使用

作者: 唠嗑008 | 来源:发表于2023-06-18 11:38 被阅读0次

    前言

    与Flow(冷流)不同,SharedFlow是热流。它可以在多个消费者之间共享数据,并且可以在任何时候发射新值。这使得它非常适合用于多个消费者需要访问相同数据的情况。不过本文并不打算深入讲解SharedFlow原理,而是从结合demo从使用上来带大家熟悉其特性。

    SharedFlow 使用

    最基础的生产消费模型

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>()
            launch {
    //消费者接收数据
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
            delay(100) //确保已经订阅
          //生产者发射数据
            sharedFlow.emit(1)
        }
    

    输出:collect: 1

    这种最简单的模式下,是看到了预期的打印。这种应该是大家都能理解的生产者-消费者模型。

    消费者没有在单独的协程

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>()
    
            sharedFlow.collect {
                println("collect: $it")
            }
    
            println("wait emit")
    
            delay(100) //确保已经订阅
            sharedFlow.emit(1)
        }
    

    猜一下结果?
    没有打印输出

    注意:这里区别是collect没有在单独的协程调用。因为collect是个挂起函数,会让当前协程挂起。由于生产者还没生产数据,消费者调用collect时发现没数据后便挂起协程。所以生产者和消费者要处在不同的协程里。

    生产者先发射,消费者再接收

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>()
            sharedFlow.emit(1)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    

    结果:collect没有收到数据。
    原因:先发射了数据,此时消费者还没有订阅,导致数据丢失。这也就说明了SharedFlow默认是没有粘性的。

    关于”粘性“:对于新的订阅者重放其已发出的值。这意味着当一个新的订阅者被添加到一个流中时,它将接收到流先前发出的所有值,即使它们在订阅之前已经被发出。

    我们大胆猜想下,要让SharedFlow具备粘性,就应该让其具有缓存机制。

    历史数据的重放机制
    用过livedata的人都只知道,即使先更新了数据,但每次添加了新的观察者,都能收到最新的数据,但SharedFlow默认是不具备这种能力的。但是这并不代表SharedFlow不行,而是需要一定的配置才能实现这种能力,其实SharedFlow这方面能力比livedata更强大,LiveData只能收到一个最新的值,但是SharedFlow经过配置之后是可以收到多个发射的历史数据。

    public fun <T> MutableSharedFlow(
        replay: Int = 0,
        extraBufferCapacity: Int = 0,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
    ): MutableSharedFlow<T> {
    

    先来看下MutableSharedFlow的构造方法中的参数

    • replay:重放次数。可以给订阅者发送之前已经发射的数据,而发射数据的个数就是通过replay指定的;
    • extraBufferCapacity:是指Buffer中除了replay外,额外增加的缓存数量;
    • onBufferOverflow:缓存区满了之后的溢出策略,有3种策略可供选择。默认BufferOverflow.SUSPEND,缓存溢出时挂起;另外还有2种丢弃策略,DROP_OLDESTDROP_LATEST,分别是溢出时丢弃缓冲区中最旧的值和最新的值。

    只配置replay

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>(replay = 1)
            sharedFlow.emit(1)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                }
            }
        }
    

    结果:collect是收到了数据

    这是replay缓存区缓存数量为1,所以后面添加的收集者可以收到历史数据。当然这个数量,你可以任意指定。

    设置extraBufferCapacity

      runBlocking {
           val sharedFlow = MutableSharedFlow<Int>(
                replay = 2,
                extraBufferCapacity = 1
            )
    
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            sharedFlow.emit(3)
    
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                    delay(1000)
                }
            }
    
        }
    

    先猜下这段代码的结果是?

    结果:collect: 2 collect: 3

    可能很多人会很奇怪,前面说过缓存数量bufferSize是replay + extraBufferCapacity。那这段代码中bufferSize是3啊,为什么collect只有2条数据呢?我敢说,这个问题很多用ShareFlow的人都没有搞清楚(至少我在网上看到的博客是这样的)。

    我先不解释,我们再来看一个例子

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>(
                replay = 2,
                extraBufferCapacity = 1
            )
            sharedFlow.emit(1)
            sharedFlow.emit(2)
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                    delay(1000) //模拟处理背压
                }
            }
            delay(200)
            sharedFlow.emit(3)
            sharedFlow.emit(4)
        }
    

    结果:collect能收到4个数据

    在这段代码中,replay和extraBufferCapacity没有变化。区别是先发射了2条数据1,2;然后开始订阅,等订阅了之后再发射数据3,4。而前一段代码是先发射完所有数据在开始订阅。

    解析:extraBufferCapacity 是用于控制额外缓冲区的容量。额外缓冲区是一个用于存储新值的缓冲区,当重放缓冲区已满时,新值将被存储在额外缓冲区中,直到有收集器准备好收集它为止。总结下,extraBufferCapacity对应的额外缓冲区是要在有收集者订阅之后才能起作用,否则只有replay重放缓存区起作用。

    emit也是个挂起函数

     runBlocking {
            val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                    delay(1000)
                }
            }
    
            launch {
                sharedFlow.collect {
                    println("collect2: $it")
                    delay(2000)
                }
            }
    
            delay(200)
            for (value in 1 until 4){
                println("emit value: $value")
                sharedFlow.emit(value)
            }
    
        }
    

    输出:
    emit value: 1
    collect: 1
    collect2: 1
    emit value: 2
    collect: 2
    collect2: 2
    emit value: 3
    collect: 3
    collect2: 3

    从打印可以看出:生产者要等待消费者消费完数据才进行下一次emit

    通过replay或者extraBufferCapacity解决背压问题
    SharedFlow 是一种具有背压支持的流。背压是一种流量控制机制,用于控制数据流的速率,以确保接收端能够处理数据的速度不超过其处理能力。在 SharedFlow 中,背压机制通过以下方式实现:

    当缓冲区已满时, emit 函数将会被挂起,直到缓冲区中有足够的空间来接收新的值。这样可以避免生产者发送大量的数据,而消费者无法及时处理的情况,从而导致内存溢出或应用程序崩溃。

    当消费者收到新值时,如果其处理速度较慢,那么生产者将会被挂起,直到消费者处理完所有的值,并释放了足够的空间来接收新的值。这样可以避免消费者被压垮,从而导致应用程序变得不可用。

    因此, SharedFlow 的背压机制可以确保生产者和消费者之间的数据流量得到平衡,以避免出现数据丢失或内存泄漏等问题。这使得 SharedFlow 成为处理大量数据的可靠和高效的方案之一。

    但是这样有个问题,生产者速度可能被消费者拖累。先来看一段代码:

    runBlocking {
    //        val sharedFlow = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 0)
            val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 3)
    
            launch {
                sharedFlow.collect {
                    println("collect: $it")
                    delay(1000)
                }
            }
    
            launch {
                sharedFlow.collect {
                    println("collect2: $it")
                    delay(2000)
                }
            }
    
            delay(200)
            for (value in 1 until 4){
                println("emit value: $value")
                sharedFlow.emit(value)
            }
    

    输出:emit value: 1
    emit value: 2
    emit value: 3
    collect: 1
    collect2: 1
    collect: 2
    collect2: 2
    collect: 3
    collect2: 3

    这段代码中消费者的速度明显比生产者慢很多,但我们通过配置replay或者extraBufferCapacity来设置了缓存buffer,就可以避免消费者拖累生产者速度的问题。

    相关文章

      网友评论

        本文标题:Kotlin SharedFlow 使用

        本文链接:https://www.haomeiwen.com/subject/jsfkydtx.html