美文网首页
Kotlin协程——Flow(一)

Kotlin协程——Flow(一)

作者: Deck方 | 来源:发表于2023-07-16 01:28 被阅读0次

    Flow是一种类似于序列的冷流(冷启动l,即lazy的),flow构建器中的代码直接到流被收集的时候才运行。

    一、流的特性

    • 流的每次单独收集都是按顺序执行的,除非使用特殊的操作符。
    • 从上游到下游每个过度操作符都会处理每个发射出的值,然后再交给末端操作符
      下面距离说明下
    fun testFlow() = runBlocking<Unit> {
            (1..5).asFlow().filter {
                it%2==0
            }.map {
                "string $it"
            }.collect {
                println("Collect $it")
            }
        }
    

    二、流的上下文

    • 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
    • flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
    • flowOn操作符,该函数用于更改流发射的上下文。
    fun testFlowOn() = runBlocking<Unit> {
            flow<Int> {
                println("start ${Thread.currentThread().name}")
                for (i in 1..3) {
                    delay(1000)
                    emit(i)
                }
            }
                .flowOn(Dispatchers.IO+CoroutineName("IOWorker"))
                .collect {
                    println("Collect $it ${Thread.currentThread().name}")
                }
        }
    //print result
    //start DefaultDispatcher-worker-1 @IOWorker#2
    //Collect 1 main @coroutine#1
    //Collect 2 main @coroutine#1
    //Collect 3 main @coroutine#1
    
    
    • 使用launchIn函数指定协程中收集流 ,launchIn函数返回一个Job对象
        fun testLaunchIn() = runBlocking<Unit> {
            flow<Int> {
                println("start ${Thread.currentThread().name}")
                for (i in 1..3) {
                    delay(1000)
                    emit(i)
                }
            }
                .flowOn(Dispatchers.IO + CoroutineName("IOWorker"))
                .onEach {
                    println("onEach $it ${Thread.currentThread().name}")
                }
                .launchIn(CoroutineScope(Dispatchers.Default + CoroutineName("LaunchWorker")))
                .join()
        }
    //print result
    //start DefaultDispatcher-worker-2 @IOWorker#3
    //onEach 1 DefaultDispatcher-worker-1 @LaunchWorker#2
    //onEach 2 DefaultDispatcher-worker-2 @LaunchWorker#2
    //onEach 3 DefaultDispatcher-worker-1 @LaunchWorker#2
    

    三、流的取消、流的取消检测

    • 流构建器对每个发射执行附加的ensureActive检测以进行取消,这意味着从flow{...}发出的繁忙循环是可以取消的。
    • 出于性能原因,大多数流操作不会执行其他取消检测,在协程处于繁忙的循环情况下,必须明确检测是否可以取消(通过cancellable操作符执行此操作)。
    //如果不使用cancellable操作符 取消会失败,因为协程比较繁忙。
    fun testCancel() = runBlocking<Unit> {
    
            flow<Int> {
                for (i in 1..5) {
                    println("emit $i")
                    emit(i)
                }
            }.cancellable()
                .collect {
                    println(it)
                    if (it == 3) {
                        cancel()
                    }
                }
    
        }
    //print result 
    //emit 1
    //1
    //emit 2
    //2
    //emit 3
    

    四、背压

    水流受到与流动方向一致的压力称为背压。—>(背压) =>(水流)
    在生产者与消费者模型(Flow)中,生产者的生产速度大于消费者的的消费速度就会产生背压。

    • buffer(),实现并发运行流中发射元素的代码。
    • conflate(),合并发射项,不对每个值进行处理。
    • collectLatest(),取消并重新发射最后一个值。
    • 当必须更改CoroutineDispatcher时,flowOn操作符使用了同样的缓冲机制(buffer),但是buffer函数显示的请求缓冲而不改变执行上下文。
    fun testBuffer() = runBlocking<Unit> {
            val duration = measureTimeMillis {
                flow<Int> {
                    for (i in 1..5) {
                        delay(100)
                        emit(i)
                    }
                }
                    .buffer(10)
                    .collect {
                        delay(300)
                        println("Collected ${Thread.currentThread().name}")
                    }
            }
            println("total duration $duration")
        }
    //print result 
    //...
    //total duration 1811
    //总时间少于2000ms
    
    
    //conflate()处理最新值
    fun testConflate() = runBlocking<Unit> {
            val duration = measureTimeMillis {
                flow<Int> {
                    for (i in 1..5) {
                        delay(100)
                        println("emit  $i ")
                        emit(i)
                    }
                }
                    .conflate()
                    .collect {
                        delay(300)
                        println("Collected  $it in ${Thread.currentThread().name}")
                    }
            }
            println("total duration $duration")
        }
    //emit  1 
    //emit  2 
    //emit  3 
    //Collected  1 in main @coroutine#1
    //emit  4 
    //emit  5 
    //Collected  3 in main @coroutine#1
    //Collected  5 in main @coroutine#1
    //total duration 1249
    
    //collectLatest 收集最后一个值
    fun testCollectLatest() = runBlocking<Unit> {
            val duration = measureTimeMillis {
                flow<Int> {
                    for (i in 1..4) {
                        delay(100)
                        println("emit  $i ")
                        emit(i)
                    }
                }
                    .collectLatest {
                        delay(300)
                        println("Collected  $it in ${Thread.currentThread().name}")
                    }
            }
            println("total duration $duration")
        }
    //print result 
    //emit  1 
    //emit  2 
    //emit  3 
    //emit  4 
    //Collected  4 in main @coroutine#6
    //total duration 940
    

    相关文章

      网友评论

          本文标题:Kotlin协程——Flow(一)

          本文链接:https://www.haomeiwen.com/subject/ijeiudtx.html