美文网首页
Kotlin Flow 三 StateFlow 和 Shared

Kotlin Flow 三 StateFlow 和 Shared

作者: 星流星 | 来源:发表于2021-06-26 22:29 被阅读0次

    StateFlow

    StateFlow 和 LiveData 差不多,都是可观察的数据容器。在 StateFlow 中任何数据的发送,它的每一个接收器都能接收到。在 StateFlow 和 SharedFlow 中收集器也可以被称为订阅者,不过这个订阅者会挂起当前协程,而且永远不会结束。

    private val state = MutableStateFlow(1)
    
    suspend fun simpleStateFlow() {
        coroutineScope {
            launch {
                delay(1000)
                state.collect {
                    println("before state value $it")
                }
            }
            launch {
                for (i in 1..100) {
                    state.emit(i)
                    delay(100)
                }
            }
    
            launch {
                state.collect {
                    println("state value $it")
                }
            }
        }
    }
    

    需要注意的是 collect 是一个挂起函数,所以一旦调用 collect 协程就会被挂起,所以上述的例子中在一个协程中发送数据,在两个协程中接收数据。

    LiveData 不同的在于, LiveData 不需要初始值,但 StateFlow 需要。

    LiveData 会与 Activity 绑定,当 View 进入 STOPED 状态时, LiveData.observer() 会自动取消注册,而从 StateFlow 或任意其他数据流收集数据的操作并不会停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集数据流。

    StateFlow热流,并不是冷流。并且 StateFlowcollect 收不到调用之前发射的数据。

    val state = MutableStateFlow(1)
    coroutineScope {
        launch {
            for (i in 0..10) {
                state.emit(i)
                delay(1000)
            }
        }
                                            
        launch {
            delay(2000)
            state.collect {
                println("receive state $it")
            }
        }
    }
    

    可以看到最终的结果是:

    receive state 2
    receive state 3
    receive state 4
    receive state 5
    receive state 6
    receive state 7
    receive state 8
    receive state 9
    receive state 10
    

    因为在接受之前 delay 了 2s,所以最后是从 2 开始接收的。

    把普通的 Flow 转化成 StateFlow

    val flow = flow {
        for (i in 0..4) {
            emit(i)
            delay(100)
        }
    }
    coroutineScope {
        val stateFlow = flow.stateIn(this)
        launch {
            stateFlow.collect {
                println("receive flow.stateIn value $it")
            }
        }
    }
    

    我们同样可以像 LiveData 一样直接获取它的值。

    stateFlow.value
    

    StateFlow 分为 StateFlowMutableStateFlow 。就像 LiveDataMutableLiveData 一样。 StateFlow 只能接收数据,不能发送数据,而 MutableStateFlow 即可以发送也可以接收。

    private suspend fun simpleStateFlowAndMutableStateFlow() {
        val mutableStateFlow = MutableStateFlow(1)
        coroutineScope {
            launch {
                collectData(mutableStateFlow.asStateFlow())
            }
            launch {
                (1..10).forEach {
                    delay(100)
                    mutableStateFlow.emit(it)
                }
            }
        }
    }
    

    如上代码所述,可以将 MutableStateFlow 通过 asStateFlow 转换成 StateFlow

    StateFlow 中给我们提供了一个协程安全的并发修改 StateFlow 中的值的方法 compareAndSet 。该方法能够保证原子的修改 StateFlow 的值。该方法是通过 CAS 来修改值。

    public fun compareAndSet(expect: T, update: T): Boolean
    

    将当前的值和期待的值进行比较,如果相等则更新当前的值,并返回 true,如果不相等则返回 false。这里的比较并修改是原子性的。

    SharedFlow

    SharedFlowStateFlow 相比,他有缓冲区区,并可以定义缓冲区的溢出规则,已经可以定义给一个新的接收器发送多少数据的缓存值。

    SharedFlow 同样有与之对应的 MutableSharedFlowMutableSharedFlow 的参数如下:

    • replay 给一个新的订阅者发送的缓冲区的数量。
    • extraBufferCapacity 除了 replay 的数量之外的缓冲区的大小。
    • onBufferOverflow 缓冲区溢出规则
      • SUSPEND 挂起
      • DROP_OLDEST 移除旧的值
      • DROP_LATEST 移除新的值

    SharedFlow 的缓冲区大于是 replay + extraBufferCapacity 。

    注意相比于 MutableStateFlowMutableSharedFlow 不需要初始值。

    suspend fun simpleSharedFlow() {
        val sharedFlow = MutableSharedFlow<Int>(
            replay = 5,
            extraBufferCapacity = 3,
        )
        coroutineScope {
            launch {
                sharedFlow.collect {
                    println("collect1 received shared flow $it")
                }
            }
            launch {
                (1..10).forEach {
                    sharedFlow.emit(it)
                    delay(100)
                }
            }
            // wait a minute
            delay(1000)
            launch {
                sharedFlow.collect {
                    println("collect2 received shared flow $it")
                }
            }
        }
    }
    

    同样的,我们可以把普通的 Flow 转换成 SharedFlow。

    suspend fun simpleConvertToSharedFlow(started: SharingStarted) {
        var start = 0L
        // create normal flow
        val flow = (1..10).asFlow()
            .onStart { start = currTime() }
            .onEach {
                println("Emit $it ${currTime() - start}ms")
                delay(100)
            }
        // convert to shared flow
        // need coroutine scope
        coroutineScope {
            val sharedFlow = flow.shareIn(this, started, replay = 2)
            delay(400)
            launch {
                println("current time ")
                sharedFlow.collect {
                    println("received convert shared flow $it at ${currTime() - start}ms")
                }
            }
        }
    }
    

    这里的转换有些复杂,可以看到我们通过 shareIn 可以将普通的 flow 转换成 SharedFlow 。可以看到 sharedIn 有三个参数:

    • CoroutineScope - sharing 的协程的作用域。
    • SharingStarted - 启动模式
      • Eagerly 迫切的,渴望的,在转换完成后立即开始 sharing 数据,当上游的数据超过 replay 的时候,前面的数据就会被丢弃,相当于 DROP_OLDEST
      • Lazily 当有第一个订阅者(调用 collect)的时候开始发射数据。
      • WhileSubscribed 当第一个订阅者出现的时候立即开始,当最后一个订阅者消失的时立即停止(默认情况下),replay 数量的缓存值将永远保留(默认情况下)。这是一个函数,可以通过参数来控制当最后一个订阅者消失时的行为,以及缓存的有效期。
        • stopTimeoutMillis - 配置最后一个订阅者消失后 sharing flow 停止的延时。
        • replayExpirationMillis - 配置 sharing flow 协程的停止和重置缓冲区之间的间隔,单位是毫秒,默认值为 Long.MAX_VALUE 缓存永远都不重置,0 表示立即重置缓存。比较难懂可以看看下面的例子。
    • replay 当订阅的时候回复的数量。

    如果上面的函数中传递的是 Eagerly ,其输出如下:

    Emit 1 2ms
    Emit 2 109ms
    Emit 3 213ms
    Emit 4 313ms
    current time 
    received convert shared flow 2 at 412ms
    received convert shared flow 3 at 412ms
    Emit 5 413ms
    received convert shared flow 4 at 414ms
    Emit 6 518ms
    received convert shared flow 5 at 519ms
    Emit 7 619ms
    received convert shared flow 6 at 619ms
    Emit 8 720ms
    received convert shared flow 7 at 720ms
    Emit 9 822ms
    received convert shared flow 8 at 823ms
    Emit 10 926ms
    received convert shared flow 9 at 926ms
    received convert shared flow 10 at 1027ms
    

    如果传入的是 Lazily ,其输入如下:

    current time 
    Emit 1 2ms
    Emit 2 105ms
    received convert shared flow 1 at 106ms
    Emit 3 209ms
    received convert shared flow 2 at 209ms
    Emit 4 313ms
    received convert shared flow 3 at 313ms
    Emit 5 415ms
    received convert shared flow 4 at 415ms
    Emit 6 518ms
    received convert shared flow 5 at 518ms
    Emit 7 622ms
    received convert shared flow 6 at 622ms
    Emit 8 725ms
    received convert shared flow 7 at 725ms
    Emit 9 826ms
    received convert shared flow 8 at 826ms
    Emit 10 932ms
    received convert shared flow 9 at 932ms
    received convert shared flow 10 at 1032ms
    

    很明显能够看出两者的区别。

    下面看看 WhileSubscribed ,这种方式非常灵活。

    fun currTime() = System.currentTimeMillis()
    
    suspend fun simpleConvertToSharedFlow(started: SharingStarted) {
        var start = 0L
        // create normal flow
        val flow = (1..10).asFlow()
            .onStart { start = currTime() }
            .onEach {
                println("Emit $it ${currTime() - start}ms")
                delay(100)
            }
        // convert to shared flow
        // need coroutine scope
        coroutineScope {
            val sharedFlow = flow.shareIn(this, started, replay = 2)
            val job = launch {
                println("current time ")
                sharedFlow.collect {
                    println("received convert shared flow $it at ${currTime() - start}ms")
                }
            }
    
            launch {
                delay(1000L)
                job.cancel()
                delay(110L)
                sharedFlow.collect {
                    println("received again shared flow $it")
                }
                println("shared flow has stop")
            }
        }
    }
    
    @OptIn(ExperimentalTime::class)
    suspend fun main() {
    //    simpleSharedFlow()
        simpleConvertToSharedFlow(
            SharingStarted.WhileSubscribed(
                stopTimeout = 100L.toDuration(DurationUnit.MILLISECONDS),
                replayExpiration = 200L.toDuration(DurationUnit.MILLISECONDS)
            )
        )
    }
    

    这里配置当最后一个订阅者消失时 delay 100ms 后停止 sharing flow,在 sharing flow 停止后 200ms 后让缓存失效。这里可以通过调整 job.cancel 后的 delay 函数的时长来看看效果。当时间为 110ms 时,会重新接受到缓存 9 和 10,并重新开始 sharing flow,如果参数调整为 320ms 时,缓存会失效,会直接重新开始 sharing flow。

    110ms 的结果:

    current time 
    Emit 1 1ms
    Emit 2 107ms
    received convert shared flow 1 at 108ms
    Emit 3 211ms
    received convert shared flow 2 at 211ms
    Emit 4 315ms
    received convert shared flow 3 at 315ms
    Emit 5 417ms
    received convert shared flow 4 at 417ms
    Emit 6 521ms
    received convert shared flow 5 at 521ms
    Emit 7 623ms
    received convert shared flow 6 at 624ms
    Emit 8 727ms
    received convert shared flow 7 at 727ms
    Emit 9 829ms
    received convert shared flow 8 at 829ms
    Emit 10 933ms
    received convert shared flow 9 at 933ms
    received again shared flow 9
    received again shared flow 10
    Emit 1 0ms
    Emit 2 105ms
    received again shared flow 1
    Emit 3 210ms
    received again shared flow 2
    Emit 4 314ms
    received again shared flow 3
    Emit 5 415ms
    received again shared flow 4
    Emit 6 519ms
    received again shared flow 5
    Emit 7 620ms
    received again shared flow 6
    Emit 8 721ms
    received again shared flow 7
    Emit 9 826ms
    received again shared flow 8
    Emit 10 927ms
    received again shared flow 9
    received again shared flow 10
    

    320ms 的结果:

    current time 
    Emit 1 1ms
    Emit 2 106ms
    received convert shared flow 1 at 106ms
    Emit 3 210ms
    received convert shared flow 2 at 210ms
    Emit 4 314ms
    received convert shared flow 3 at 314ms
    Emit 5 414ms
    received convert shared flow 4 at 414ms
    Emit 6 517ms
    received convert shared flow 5 at 517ms
    Emit 7 623ms
    received convert shared flow 6 at 623ms
    Emit 8 726ms
    received convert shared flow 7 at 727ms
    Emit 9 827ms
    received convert shared flow 8 at 827ms
    Emit 10 931ms
    received convert shared flow 9 at 931ms
    Emit 1 0ms
    Emit 2 105ms
    received again shared flow 1
    Emit 3 209ms
    received again shared flow 2
    Emit 4 315ms
    received again shared flow 3
    Emit 5 418ms
    received again shared flow 4
    Emit 6 523ms
    received again shared flow 5
    Emit 7 627ms
    received again shared flow 6
    Emit 8 732ms
    received again shared flow 7
    Emit 9 833ms
    received again shared flow 8
    Emit 10 937ms
    received again shared flow 9
    received again shared flow 10
    

    我们看下面这段源码就会很快明白:

    override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
        .transformLatest { count ->
            if (count > 0) {
                emit(SharingCommand.START)
            } else {
                delay(stopTimeout)
                if (replayExpiration > 0) {
                    emit(SharingCommand.STOP)
                    delay(replayExpiration)
                }
                emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
            }
        }
        .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
        .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
    

    相关文章

      网友评论

          本文标题:Kotlin Flow 三 StateFlow 和 Shared

          本文链接:https://www.haomeiwen.com/subject/ohhtultx.html