美文网首页
【Koltin Flow(五)】SharedFlow及State

【Koltin Flow(五)】SharedFlow及State

作者: MakerGaoGao | 来源:发表于2022-08-05 09:37 被阅读0次

    目录

    【Koltin Flow(一)】五种创建flow的方式
    【Koltin Flow(二)】Flow操作符之末端操作符
    【Koltin Flow(三)】Flow操作符之中间操作符(一)
    【Koltin Flow(三)】Flow操作符之中间操作符(二)
    【Koltin Flow(三)】Flow操作符之中间操作符(三)
    【Koltin Flow(四)】Flow背压
    【Koltin Flow(五)】SharedFlow及StateFlow

    SharedFlow

    简介

    相对于Flow而言,SharedFlow为热流,也就是说不管有无接收者,都会发送值。
    

    一、基本使用

    代码如下:
                val sharedFlow = MutableSharedFlow<Int>()
                launch {
                    sharedFlow.collect {
                        Log.d(TAG.TAG,"SharedFlow $it")
                    }
                }
    
                delay(10)
                sharedFlow.emit(1)
                sharedFlow.emit(100)
                sharedFlow.emit(100)
    
    日志如下:
    2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 1
    2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
    2022-08-03 10:16:53.367 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
    
    分析
    • 日志可以看出 基本使用的时候和flow本身没多大区别,发送-接收。
    • 可以看出上面有个delay(10),如果没有则可能会接收不到值,因为SharedFlow为热流,不管有无接收者,emit都会直接发送值。

    二、设置订阅重发

    我们看SharedFlow构造的第一个参数replay,此参数用来设置订阅重发的个数。
    
    代码如下:
                val sharedFlow = MutableSharedFlow<Int>(
                    replay = 2
                )
    
                sharedFlow.emit(1)
                sharedFlow.emit(100)
                sharedFlow.emit(100)
                //注释1  在此处加上重发缓存打印
                //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
                delay(100)
    
                launch {
                    sharedFlow.collect {
                        Log.d(TAG.TAG,"SharedFlow $it")
                    }
                }
                 //注释2 重置缓存
                //delay(100)
                //sharedFlow.resetReplayCache()
                //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
                //launch {
                  //  sharedFlow.collect {
                    //    Log.d(TAG.TAG,"SharedFlow 2 $it")
                    //}
                //}
    
    日志如下:
    //注释1 处的打印
    //2022-08-03 10:23:13.621 5106-5131/edu.test.demo D/Test-TAG: SharedFlow [100, 100]
    2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
    2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
    //注释2下的打印
    //2022-08-03 10:26:54.917 5233-5260/edu.test.demo D/Test-TAG: SharedFlow []
    
    分析:
    • 可以看到,虽然是发送在前,接收在后,但还是收到了两个值。因为replay重发的值设置为2,可以这样看起来不直观,这样,我们修改下代码,放开注释1出的打印,则日志会多出来【注释1 处的打印】部分的内容,可以很清晰的看出缓存了后两个值。
    • 当然此缓存可以重置,也就是清空之前的缓存,放开注释2处的代码,则可以看出resetReplayCache,之后则清空了缓存,后面的collect接收不到缓存的值。

    三、其他两个参数,可参考背压部分的内容

    四、shareIn操作符

    shareIn操作符是将冷流flow转换为热流SharedFlow,主要参数有三个
    1、第一个为作用域。
    2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
    WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
    stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
    replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
    3、缓存重发的个数。
    
    第一种策略
    代码如下:
         val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Eagerly,0)
                delay(100)
                sharedFlow.collect {
                    Log.d(TAG.TAG,"shareIn $it")
                }
    
    日志没有
    分析:
    • 我们发现没有日志,原因在哪里呢,因为转换成热流之后策略为Eagerly,立即开始发送,但是100ms之后才有collect,同时replay的个数为0,所以接收不到值,如果我们将replay个数改为2,则可以接收到4和5,和上面的订阅重发是一样的。
    第二种策略
    代码如下:
        val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Lazily, 2)
                delay(100)
                launch {
                    sharedFlow.collect {
                        Log.d(TAG.TAG, "shareIn $it")
                    }
                }
                delay(100)
                launch {
                    sharedFlow.collect {
                        Log.d(TAG.TAG, "shareIn 2 $it")
                    }
                }
    
    日志如下:
    2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 1
    2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 2
    2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 3
    2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 4
    2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 5
    2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 4
    2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 5
    
    分析:
    • 可以看出,第一个collect能接收到全部值,那是因为Lazily是在第一个订阅者出现后才发送值的,但是第二个collect却只接收到了缓存的两个值,那是因为Lazily只管第一个collect,不管后续的collect。
    第三种策略
    1. 采用默认参数
    代码如下:
      var time  = 0L
                time = System.currentTimeMillis()
                val sharedFlow = (1..100).asFlow().onStart {
                    Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
                }.onCompletion {
                    Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
                }.onEach {
                    delay(1000)
                }.shareIn(this, SharingStarted.WhileSubscribed(), 0)
    
                delay(1000)
                launch {
                    Log.d(TAG.TAG, "1 shareIn ${sharedFlow.first()}")
                    Log.d(TAG.TAG,"1 接收到第一个值 ${System.currentTimeMillis() - time}")
                }
    
                delay(3000)
                launch {
                    Log.d(TAG.TAG, "2 shareIn ${sharedFlow.first()}")
                    Log.d(TAG.TAG,"2 接收到第一个值 ${System.currentTimeMillis() - time}")
                }
    
    日志如下:
    2022-08-03 14:00:32.842 8905-8930/edu.test.demo D/Test-TAG: onStart 1029
    2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 shareIn 1
    2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 接收到第一个值 2031
    2022-08-03 14:00:33.845 8905-8931/edu.test.demo D/Test-TAG: onCompletion 2032
    2022-08-03 14:00:35.839 8905-8931/edu.test.demo D/Test-TAG: onStart 4026
    2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 shareIn 1
    2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 接收到第一个值 5028
    2022-08-03 14:00:36.841 8905-8930/edu.test.demo D/Test-TAG: onCompletion 5028
    
    分析:
    • 可以看出 在first之后第一个接收者消失,所以执行了onCompletion,也就是flow结束了,而且接收到第一个值和onCompletion的时间基本是一致的。
    • 在4000ms之后第二个接收者出现,重新执行了onStart ,并且在first之后也执行了onCompletion结束了。
    2.进行相关的参数配置
    代码如下:
                 var time  = 0L
                time = System.currentTimeMillis()
                val sharedFlow = (1..100).asFlow().onStart {
                    Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
                }.onCompletion {
                    Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
                }.onEach {
                    delay(1000)
                }.shareIn(this, SharingStarted.WhileSubscribed(
                    stopTimeoutMillis = 500,
                    replayExpirationMillis = 2000
                ), replay = 5)
    
                delay(1*1000)
                launch {
                    Log.d(TAG.TAG, "1 shareIn ${sharedFlow.take(5).toList()}")
                    Log.d(TAG.TAG,"1 接收到值 ${System.currentTimeMillis() - time}")
                }
    
                delay(10*1000)
                launch {
                    Log.d(TAG.TAG, "2 shareIn ${sharedFlow.take(10).toList()}")
                    Log.d(TAG.TAG,"2 接收到值 ${System.currentTimeMillis() - time}")
                }
    
    日志如下( stopTimeoutMillis = 500,replayExpirationMillis = 2000,replay = 5):
    2022-08-03 14:21:15.245 9591-9616/edu.test.demo D/Test-TAG: onStart 1030
    2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
    2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 接收到值 6037
    2022-08-03 14:21:20.753 9591-9616/edu.test.demo D/Test-TAG: onCompletion 6538
    2022-08-03 14:21:25.243 9591-9622/edu.test.demo D/Test-TAG: onStart 11028
    2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 接收到值 21038
    2022-08-03 14:21:35.755 9591-9617/edu.test.demo D/Test-TAG: onCompletion 21540
    
    分析:
    • 可以看出,在设置了 stopTimeoutMillis = 500之后,接收到值得时间和onCompletion的时间基本差了500ms,其实就是就是延时结束。
    • 在设置了replayExpirationMillis = 2000之后,第二次开始和接收到值的时间基本就是发送10个值得时间,具体值为1-10,那是因为缓存的值已经失效了,此时这个replay = 5没有多大意义,因为到下次的时候值已经失效了。
    日志如下( stopTimeoutMillis = 500,replayExpirationMillis = Long.MAX_VALUE,replay = 5):
    2022-08-03 14:47:30.144 9697-9724/edu.test.demo D/Test-TAG: onStart 1030
    2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
    2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 接收到值 6040
    2022-08-03 14:47:35.655 9697-9723/edu.test.demo D/Test-TAG: onCompletion 6541
    2022-08-03 14:47:40.141 9697-9726/edu.test.demo D/Test-TAG: onStart 11027
    2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 接收到值 16035
    2022-08-03 14:47:45.652 9697-9726/edu.test.demo D/Test-TAG: onCompletion 16538
    
    分析:
    • 首先看区别,第一个接收者基本没变化。
    • 第二个接收者有两点变化,接收到的值变了,不再是1-10,而是两个1-5,而且第二个onStart和接收到值的时间也变了,不再是10个值的时间,而是五个值的时间,原因主要如下:第一replayExpirationMillis值设置为 Long.MAX_VALUE(这也是其默认值)之后,第二个接收者出现时,缓存并未失效,所以出现了前面的12345(因为replay=5,缓存前五个),但是take(10)不够了,所以又出现了onStart,再接收五个,也就是后面的12345,时间也就是接收五个的时间。
    • 如果将第二个接收者的take(10)变成take(5),缓存就直接够了,就不会出现第二个onStart。
    • 当然这也是因为前面是take(5),如果改成take(3),后面也就会出现onStart,数据变成12312,那是因为虽然缓存池为5,但是只缓存了三个值,还需要两个。

    StateFlow

    简介

    和SharedFlow一样,StateFlow也是热流,但是区别在于状态的保存,保存了最新的值,也就是新的接收者会收到最新的值,
    和设置了replay = 1的SharedFlow比较类似。
    

    简单使用

    代码如下:
                val stateFlow = MutableStateFlow(0)
                launch {
                    stateFlow.collect{
                        Log.d(TAG.TAG,"stateFlow 1 collect $it")
                    }
                }
                delay(1000)
                stateFlow.value = 10
                delay(1000)
                stateFlow.value = 10
                delay(1000)
                stateFlow.value = 11
                launch {
                    stateFlow.collect{
                        Log.d(TAG.TAG,"stateFlow 2 collect $it")
                    }
                }
    
    
    日志如下:
    2022-08-03 15:43:14.618 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 0
    2022-08-03 15:43:15.623 11669-11704/edu.test.demo D/Test-TAG: stateFlow 1 collect 10
    2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 11
    2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 2 collect 11
    
    分析:
    • 可以看到接收者1刚开始收到了初始值1,那是因为StateFlow在每个接收者出现时都会接收到最新的值。
    • 接收者1后面又接收到了10,那是动态更新StateFlow的value值触发的,但是注意10发送了两次,但是只接收到了一次,说明StateFlow是天然防抖的,连续发送两次同样的值,只会接收一次,后面又接收到了11和第一个10效果一致,是value值触发的。
    • 接收者2接收到11的道理和接收者1接收到1的道理是一样的。

    stateIn操作符

    stateIn操作符是将冷流flow转换为热流StateFlow,主要参数有三个
    1、第一个为作用域。
    2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
    WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
    stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
    replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
    3、StateFlow的初始值。
    
    策略基本和shareIn是一致的,只是replay固定为1,另外会有一个初始值。
    分析一种,其他的和shareIn类比即可,策略为WhileSubscribed()。
    
    代码如下:
                 var time = 0L
                time = System.currentTimeMillis()
                val stateFlow = (1..10).asFlow().onStart {
                    Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
                }.onCompletion {
                    Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
                }.onEach {
                    delay(1000)
                }.stateIn(this, SharingStarted.WhileSubscribed(),0)
    
                launch {
                    Log.d(TAG.TAG, "stateFlow 1 collect ${stateFlow.take(5).toList()}")
                    Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
                }
                delay(10*1000)
                launch {
                    Log.d(TAG.TAG, "stateFlow 2 collect ${stateFlow.take(5).toList()}")
                    Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
                }
    
    日志如下:
    2022-08-03 15:33:35.204 11353-11379/edu.test.demo D/Test-TAG: onStart 70
    2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: stateFlow 1 collect [0, 1, 2, 3, 4]
    2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: 接收到值 4101
    2022-08-03 15:33:39.221 11353-11378/edu.test.demo D/Test-TAG: onCompletion 4102
    2022-08-03 15:33:45.145 11353-11378/edu.test.demo D/Test-TAG: onStart 10027
    2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: stateFlow 2 collect [4, 1, 2, 3, 4]
    2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: 接收到值 14033
    2022-08-03 15:33:49.151 11353-11380/edu.test.demo D/Test-TAG: onCompletion 14033
    
    分析:
    • 可以看到接收者1接收到的值为01234,因为有一个初始值为0,占了一个位置,开始到接收到值的时间也基本是个值的时间,接收到之后也打印了onCompletion,和接收到值的时间基本一致。
    • 第二个接收者,接收到的值为41234,因为StateFlow缓存了一个最新值4,再接收四个新值,时间和接收者1类似。

    总结

    • 本篇主要介绍了SharedFlow和StateFlow的基本使用、以及参数设置相关内容。
    • 本篇也终点介绍了shareIn操作符的使用,以及各种策略参数的设置,stateIn类比shareIn理解。

    相关文章

      网友评论

          本文标题:【Koltin Flow(五)】SharedFlow及State

          本文链接:https://www.haomeiwen.com/subject/grfpwrtx.html