Kotlin StateFlow、SharedFlow、Chan

作者: 奔跑吧李博 | 来源:发表于2023-11-19 20:29 被阅读0次

    StateFlowSharedFlow 是 Flow API,允许数据流以最优方式发出状态更新并向多个使用方发出值。

    StateFlow

    官方文档解释:StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value 属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow 类的 value 属性分配一个新值。

    StateFlow有两种类型: StateFlow 和 MutableStateFlow :

    public interface StateFlow<out T> : SharedFlow<T> {
        public val value: T
    }
    
    public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
        public override var value: T
    
        public fun compareAndSet(expect: T, update: T): Boolean
    }
    

    状态由其值表示。任何对值的更新都会反馈新值到所有流的接收器中。

    StateFlow 基本使用
    class StateFlow {
    
        private val _state = MutableStateFlow<String>("unKnown")
        val state: kotlinx.coroutines.flow.StateFlow<String> get() = _state
    
        fun getApi(scope: CoroutineScope) {
            scope.launch {
                var res = getApi()
                _state.value = res
            }
        }
    
        /**
         * 进行网络Api请求
         */
        private suspend fun getApi() = withContext(Dispatchers.IO) {
            delay(2000) //模拟耗时请求
            "result"
        }
    
    }
    
        private fun stateFlowTest() = runBlocking {
            val stateFlow = StateFlow()
            stateFlow.getApi(this) //开始获取结果
    
            launch(Dispatchers.IO) {
                stateFlow.state.collect {
                    Log.i("minfo", "${Thread.currentThread().name} + $it")
                }
            }
    
            launch(Dispatchers.IO) {
                stateFlow.state.collect {
                    Log.i("minfo", "${Thread.currentThread().name} + $it")
    
                }
            }
        }
    

    打印结果:

     I  DefaultDispatcher-worker-1 + unKnown
     I  DefaultDispatcher-worker-2 + unKnown
    // 等待两秒
     I  DefaultDispatcher-worker-1 + result
     I  DefaultDispatcher-worker-1 + result
    

    StateFlow 的使用方式与 LiveData 类似。
    MutableStateFlow 是可变类型的,即可以改变 value 的值。 StateFlow 则是只读的。这与 LiveData、MutableLiveData一样。

    为什么使用 StateFlow

    我们知道 LiveData 有如下特点:

    1.只能在主线程更新数据,即使在子线程通过 postValue()方法,最终也是将值 post 到主线程调用的 setValue()
    2.LiveData 是不防抖的
    3.LiveData 的 transformation 是在主线程工作
    4.LiveData 需要正确处理 “粘性事件” 问题。
    鉴于此,使用 StateFlow 可以轻松解决上述场景。

    class StateFlow {
    
        private val _state = MutableStateFlow<String>("unKnown")
        val state: kotlinx.coroutines.flow.StateFlow<String> get() = _state
    
        fun getApi2(scope: CoroutineScope) {
            scope.launch {
                delay(2000)
                _state.value = "hello, coroutine"
            }
        }
    
        fun getApi3(scope: CoroutineScope) {
            scope.launch {
                delay(2000)
                _state.value = "hello, kotlin"
            }
        }
    }
    
        fun stateFlowTest2() = runBlocking<Unit> {
            val stateFlow: StateFlow = StateFlow()
    
            stateFlow.getApi2(this) // 开始获取结果
            delay(1000)
            stateFlow.getApi3(this) // 开始获取结果
    
            val job1 = launch(Dispatchers.IO) {
                delay(8000)
                stateFlow.state.collect {
                    Log.i("minfo", "${Thread.currentThread().name} + $it")
                }
            }
            val job2 = launch(Dispatchers.IO) {
                delay(8000)
                stateFlow.state.collect {
                    Log.i("minfo", "${Thread.currentThread().name} + $it")
                }
            }
    
            // 避免任务泄漏,手动取消
            delay(10000)
            job1.cancel()
            job2.cancel()
        }
    

    现在的场景是,先请求 getApi1(), 一秒之后再次请求 getApi2(), 这样 stateFlow 的值加上初始值,一共被赋值过 3 次。确保,三次赋值都完成后,我们再收集 StateFlow 中的数据。
    打印结果:

     DefaultDispatcher-worker-1 + hello, kotlin
     DefaultDispatcher-worker-2 + hello, kotlin
    

    结果显示了,StateFlow 只会将最新的数据发射给订阅者。对比 LiveData, LiveData 内部有 version 的概念,对于注册的订阅者,会根据 version 进行判断,将历史数据发送给订阅者。即所谓的“粘性”。而SharedFlow可以做到此种粘性。

    如需将任何数据流转换为 StateFlow,请使用stateIn中间运算符。

    StateFlow、Flow 和 LiveData

    StateFlow 和LiveData具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。

    但请注意,StateFlow 和LiveData的行为确实有所不同:

    • StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要。
    • 当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,您需要从 Lifecycle.repeatOnLifecycle 块收集数据流。
    SharedFlow

    SharedFlow 也有两种类型:SharedFlow 和 MutableSharedFlow。
    使用 sharedIn 方法可以将 Flow 转换为 SharedFlow。

    public fun <T> MutableSharedFlow(
       replay: Int,   // 当新的订阅者Collect时,发送几个已经发送过的数据给它
       extraBufferCapacity: Int = 0,  // 减去replay,MutableSharedFlow还缓存多少数据
       onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  // 缓存溢出时的处理策略,三种 丢掉最新值、丢掉最旧值和挂起
    ): MutableSharedFlow<T>
    
    class SharedFlow {
    
        private val _state = MutableSharedFlow<Int>(replay = 3,
            extraBufferCapacity = 2)
        val state: MutableSharedFlow<Int> get() = _state
    
        fun getApi(scope: CoroutineScope) {
            scope.launch {
                for (i in 0..20) {
                    delay(200)
                    _state.emit(i)
                    Log.i("minfo", "send data $i")
                }
            }
        }
    
        fun main() = runBlocking {
            getApi(this)
    
            val job = launch(Dispatchers.IO) {
                delay(3000)
                state.collect {
                    Log.i("minfo", "collect  $it")
                }
            }
            delay(5000)
            job.cancel()
        }
    }
    

    打印结果:

    send data 0
    send data 1
    send data 2
    send data 3
    send data 4
    send data 5
    send data 6
    send data 7
    send data 8
    send data 9
    send data 10
    send data 11
    send data 12
    send data 13
    collect  11
    collect  12
    collect  13
    send data 14
    collect  14
    send data 15
    collect  15
    send data 16
    collect  16
    send data 17
    collect  17
    send data 18
    collect  18
    send data 19
    collect  19
    send data 20
    collect  20
    

    分析一下该结果:
    SharedFlow 每 200ms 发射一次数据,总共发射 21 个数据出来,耗时大约 4s。
    SharedFlow 的 replay 设置为 3, extraBufferCapacity 设置为2, 即 SharedFlow 的缓存为 5 。缓存溢出的处理策略是默认挂起的。
    订阅者是在 3s 之后开始收集数据的。此时应该已经发射了 14 个数据,即 0-13, SharedFlow 的缓存为 5, 缓存的数据为 9-13, 但是,只给订阅者发送 3 个旧数据,即订阅者收集到的值是从 11 开始的。

    StateFlow 和 SharedFlow 的使用场景

    StateFlow 的命名已经说明了适用场景, StateFlow 只会向订阅者发射最新的值,适用于对状态的监听。
    SharedFlow 可以配置对历史发射的数据进行订阅,适合用来处理对于事件的监听。

    Channel

    Flow底层使用的Channel机制实现,StateFlow、SharedFlow都是一对多的关系,如果上游发送者与下游UI层的订阅者是一对一的关系,可以使用Channel来实现,Channel默认是粘性的。

    Channel使用特点:
    每个消息只有一个订阅者可以收到,用于一对一的通信。

    Channel使用示例:

    //viewModel中
    private val _loadingChannel = Channel<Boolean>()
    val loadingFlow = _loadingChannel.receiveAsFlow()
    
    private suspend fun loadStart() {
        _loadingChannel.send(true)
    }
    
    private suspend fun loadFinish() {
        _loadingChannel.send(false)
    }
    
    //UI层接收Loading信息
     mViewModel.loadingFlow.flowWithLifecycle2(this, Lifecycle.State.STARTED) { isShow ->
         mStatusViewUtil.showLoadingView(isShow)
     }
    

    参考:
    https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn

    https://www.cnblogs.com/joy99/p/15805955.html#13-%E5%88%9B%E5%BB%BA%E5%B8%B8%E8%A7%84-flow-%E7%9A%84%E5%B8%B8%E7%94%A8%E6%96%B9%E5%BC%8F

    用到demo:

    https://github.com/running-libo/FlowUse

    相关文章

      网友评论

        本文标题:Kotlin StateFlow、SharedFlow、Chan

        本文链接:https://www.haomeiwen.com/subject/mwdmwdtx.html