美文网首页
Flow简介

Flow简介

作者: Jack921 | 来源:发表于2022-12-29 18:35 被阅读0次

    Kotlin 协程中使用挂起函数可以实现非阻塞地执行任务并将结果返回回来,但是只能返回一个计算结果。但是如果希望有多个计算结果返回回来,则可以使用 flow,flow有像Rxjava的各种操作符,实现各种功能,同时和协程一起使用,可以替代Rxjava和liveData,并且也没有像Rxjava上手这么难,所以学kotlin,flow是必须的。

    flow简单使用:
    flow{
        //发送者发送数值
        emit(1)   
    }.collect{
        //接受者接受发送的数值
        println(it.toString())
    }
    

    看起来和Rxjava很像,但是又简单很多吧

    flow的冷流与热流
    • 冷流
      上面的简单使用即是冷流,即执行是惰性的,调用末端流操作符(collect 是其中之一)之前, flow{ ... } 中的代码不会执行,只有当数据被订阅的时候(执行collect),发布者才开始执行发射数据流的代码(执行flow{ ... })。当有多个订阅者的时候,每个订阅者都会收到发送者完整的流程。即订阅者和发送者都是一对一的关系。
      例子如下:


      image.png

    我们准备3个按钮,分别对应代码如下:

    //发送者代码:
    var test: Flow<Int>?=null
    test= flow {
        for (i in 0..4) {
        Log.e(TAG+"2", i.toString())
        emit(i)
    }
    
    //订阅者1代码:
    test?.collect{
        delay(1000)
        Log.e(TAG, it.toString())
    }
    
    //订阅者2代码:
    test?.collect{
        delay(1000)
        Log.e(TAG, it.toString())
    }
    
    

    上面三个按钮的代码都贴上去了,其中订阅者1和订阅者2代码一样,当我们只是点发送者按钮时,flow {...} flow里面的代码块是没有执行的,然后我们再点击订阅者1按钮,这时候发送者代码才开始执行,从而发送给订阅者,连续执行


    image.png

    当我们再点击订阅者2按钮的时候,会发现和上面的订阅者1按钮的效果一样,所以印证了一对一的关系,每个订阅者都会收到发送者完整的流程。

    • 热流
      热流是共享的,有缓存的,不管订阅者是否存在,只要发送了事件就会被消费,热流和订阅者是一对多的关系,多个订阅者可以共享同一个数据流。当一个订阅者停止监听时,数据流不会自动关闭。以MutableSharedFlow举例:
    val mutableSharedFlow=MutableSharedFlow<Int>()
        lifecycleScope.launch {
            mutableSharedFlow.collect{
                Log.e("mutableSharedFlow1",it.toString())
            }
        }
        lifecycleScope.launch {
            (1..5).forEach {
                Log.e("mutableSharedFlow1_before",it.toString())
                mutableSharedFlow.emit(it)
                Log.e("mutableSharedFlow1_after",it.toString())
            }
        }
        lifecycleScope.launch {
            delay(1000)
            mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }
    

    运行结果如下:


    image.png

    可以看到,第二个订阅者是没有收到发送者的数据,因为在订阅之前已经被消费了,所以收不到数据

    热流的具体实现SharedFlow和StateFlow,分别对应的实现类MutableSharedFlow和是MutableStateFlow,所以我们要讲的也就是这两个类。

    1. MutableSharedFlow

    有缓冲区区,并可以定义缓冲区的溢出规则,可以定义给一个新的接收器发送多少数据的缓存值。
    MutableSharedFlow 的参数如下:

    • replay 相当于粘性数据
    • extraBufferCapacity //接受的慢时候,发送的入栈
    • onBufferOverflow 缓冲区溢出规则:
      1. SUSPEND: 挂起
      2. DROP_OLDEST: 移除旧的值
      3. DROP_LATEST: 移除新的值
    replay:事件粘滞数

    当我们把上面的MutableSharedFlow的replay设置为1是,即如下代码:

    val mutableSharedFlow=MutableSharedFlow<Int>(replay = 1)
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            mutableSharedFlow.emit(it)
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }
    

    运行结果如下:


    image.png

    可以看到,第二个订阅者收到了最后一次运行的结果5,所以replay会保留上次运行的结果,replay设置多少,他就保留最新的前多少数据。

    extraBufferCapacity

    缓存容量,就是先发送几个事件,不管已经订阅的消费者是否接收,都先发送先。

    val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2)
    lifecycleScope.launch {
        mutableSharedFlow.collect{
            Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }
    

    运行结果如下:


    image.png

    相对于上面,可以看到extraBufferCapacity设置2之后,头两个会先发送而不管有没有被消费完,超过第3个之后,才开始执行,执行完之后又先发送先发送两个而不管有没有被消费。

    onBufferOverflow

    因为有第二个参数,所以当没有被消费完的时候,这可能导致缓存容量过多,只管发不管消费者消费能力的情况就会出现背压,所以第3个参数就是出现背压的时候要怎么处理的。

    分别是 SUSPEND: 挂起,DROP_OLDEST: 移除旧的值,DROP_LATEST: 移除新的值。

    SUSPEND

    因为默认就是SUSPEND,所以上面的MutableSharedFlow<Int>(extraBufferCapacity = 2)就是MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.SUSPEND),所以和讲extraBufferCapacity的demo是一样的。

    DROP_OLDEST
    移除旧的值,保留最新的,extraBufferCapacity就保留多少,代码如下:

    val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_OLDEST)
    lifecycleScope.launch {
        mutableSharedFlow.collect{
             Log.e("mutableSharedFlow1",it.toString())
        }
    }
    lifecycleScope.launch {
        (1..5).forEach {
            Log.e("mutableSharedFlow1_before",it.toString())
            mutableSharedFlow.emit(it)
            Log.e("mutableSharedFlow1_after",it.toString())
        }
    }
    lifecycleScope.launch {
        delay(1000)
        mutableSharedFlow.collect {
            Log.e("mutableSharedFlow2",it.toString())
        }
    }
    

    运行效果如下:

    image.png

    可以看到运行5个,超出缓存容量,只保留最新的两个,这就实现了消费者消费速度小于生产者的时候的背压问题。

    DROP_LATEST

    移除新的值,保留最旧的,extraBufferCapacity就保留多少,代码如下:

    val mutableSharedFlow=MutableSharedFlow<Int>(extraBufferCapacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST)
        lifecycleScope.launch {
            mutableSharedFlow.collect{
                Log.e("mutableSharedFlow1",it.toString())
            }
        }
        lifecycleScope.launch {
            (1..5).forEach {
                Log.e("mutableSharedFlow1_before",it.toString())
                mutableSharedFlow.emit(it)
                Log.e("mutableSharedFlow1_after",it.toString())
            }
        }
        lifecycleScope.launch {
            delay(1000)
            mutableSharedFlow.collect {
                Log.e("mutableSharedFlow2",it.toString())
            }
        }
    

    运行结果如下:

    image.png

    可以看到运行5个,超出缓存容量,只保留最旧的两个。

    2.MutableStateFlow

    MutableStateFlow 就是reply为1的MutableSharedFlow,同时它必须要有一个初始值,此外每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值。具体demo如下:

    val stateFlow =MutableStateFlow(value = -1)
        lifecycleScope.launch {
            stateFlow.collect{
                Log.e("mutableStateFlow1",it.toString())
            }
        }
        lifecycleScope.launch {
            listOf(1,2,3,4,4).forEach {
                Log.e("mutableStateFlow_before",it.toString())
                stateFlow.emit(it)
                Log.e("mutableStateFlow_after",it.toString())
            }
        }
        lifecycleScope.launch {
            delay(1000)
            stateFlow.collect {
            Log.e("mutableStateFlow2",it.toString())
        }
    }
    

    运行结果如下:


    image.png

    可以看到,只要初始值和最新值,其他的值都不会,StateFlow重点在状态,只有初始值和最新值,而不会有中间值,这对于UI的状态更合适,防止重复刷新,而SharedFlow更适合事件的处理。

    背压三剑客

    从上面的讲解里,我们了解了MutableSharedFlow和MutableStateFlow的背压。
    那冷流要怎么实现呢,其实操作符也有背压处理的。

    背压说白了就是消费者的消费速度达不到生产者的创建速度时,就会产生数据的淤积。

    • collectLatest
      当出现背压是,只会执行最新的数据,代码如下:
    flow {
        (1..5).forEach{
            emit(it)
        }
    }.collectLatest {
        Log.e("collectLatest_start",it.toString())
        delay(1000)
        Log.e("collectLatest_end",it.toString())
     }
    

    运行结果如下:

    image.png

    可以看到,会结束旧的数据执行即使在执行中,而执行最新的数据

    • conflate
      conflate 与collectLatest不同的是,conflate会先把旧的执行完,再去执行最新的数据,保证每次执行有始有终,而不会中途中断
    flow {
        (1..5).forEach{
             emit(it)
        }
    }.conflate()
     .collect {
        Log.e("conflate_start",it.toString())
        delay(1000)
         Log.e("conflate_end",it.toString())
     }
    

    运行结果:

    image.png

    可以看到,对1数据也执行到结束才执行5,中间的数据直接过滤掉,有始有终

    • buffer
      首先,buffer的数据发送就不会受collect函数的影响,不用等collect执行完后才发送下一条,其二,buffer也有点像MutableSharedFlow,有两个参数,分别是capacity和onBufferOverflow,
    1. capacity: 缓存数量
    2. onBufferOverflow: 处理缓存策略

    下面一个个验证,首先看发送,demo如下:

    flow {
        (1..5).forEach {
            emit(it)
        }
    }.onEach {
        Log.e("buffer1","$it is ready")
    }.collect {
        delay(1000)
        Log.e("buffer2","$it is handled")
    }
    

    结果如下:


    image.png

    可以看到,执行流程是先发一个,执行完再发下一个,事件发送和处理是连续的,假如加上buffer()呢,

    flow {
        (1..5).forEach {
            emit(it)
        }
    }.onEach {
        Log.e("buffer1","$it is ready")
    }.buffer()
    .collect {
        delay(1000)
        Log.e("buffer2","$it is handled")
    }
    

    结果如下:


    image.png

    可以看到加了之后不管有没有执行,都先发送,然后再一个个执行,不再受collect{}影响。

    其二说白了就是设置缓存数量和处理策略,即设置capacity和onBufferOverflow,和上面的MutableSharedFlow有点像,举一个例子基本可以。

    flow {
        (1..5).forEach {
            emit(it)
        }
    }.onEach {
        Log.e("buffer1","$it is ready")
    }
    .buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
    .collect {
        delay(1000)
        Log.e("buffer2","$it is handled")
    }
    

    结果如下:


    image.png

    设置缓存数量为1,保持处理最旧的事件,DROP_OLDEST(处理缓存最旧),其他的SUSPEND(挂起),DROP_OLDEST(处理缓存最新)

    相关文章

      网友评论

          本文标题:Flow简介

          本文链接:https://www.haomeiwen.com/subject/awvgqdtx.html