美文网首页
Kotlin协程——Flow(一)

Kotlin协程——Flow(一)

作者: Deck方 | 来源:发表于2023-07-16 01:28 被阅读0次

Flow是一种类似于序列的冷流(冷启动l,即lazy的),flow构建器中的代码直接到流被收集的时候才运行。

一、流的特性

  • 流的每次单独收集都是按顺序执行的,除非使用特殊的操作符。
  • 从上游到下游每个过度操作符都会处理每个发射出的值,然后再交给末端操作符
    下面距离说明下
fun testFlow() = runBlocking<Unit> {
        (1..5).asFlow().filter {
            it%2==0
        }.map {
            "string $it"
        }.collect {
            println("Collect $it")
        }
    }

二、流的上下文

  • 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
  • flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
  • flowOn操作符,该函数用于更改流发射的上下文。
fun testFlowOn() = runBlocking<Unit> {
        flow<Int> {
            println("start ${Thread.currentThread().name}")
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
            .flowOn(Dispatchers.IO+CoroutineName("IOWorker"))
            .collect {
                println("Collect $it ${Thread.currentThread().name}")
            }
    }
//print result
//start DefaultDispatcher-worker-1 @IOWorker#2
//Collect 1 main @coroutine#1
//Collect 2 main @coroutine#1
//Collect 3 main @coroutine#1

  • 使用launchIn函数指定协程中收集流 ,launchIn函数返回一个Job对象
    fun testLaunchIn() = runBlocking<Unit> {
        flow<Int> {
            println("start ${Thread.currentThread().name}")
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }
            .flowOn(Dispatchers.IO + CoroutineName("IOWorker"))
            .onEach {
                println("onEach $it ${Thread.currentThread().name}")
            }
            .launchIn(CoroutineScope(Dispatchers.Default + CoroutineName("LaunchWorker")))
            .join()
    }
//print result
//start DefaultDispatcher-worker-2 @IOWorker#3
//onEach 1 DefaultDispatcher-worker-1 @LaunchWorker#2
//onEach 2 DefaultDispatcher-worker-2 @LaunchWorker#2
//onEach 3 DefaultDispatcher-worker-1 @LaunchWorker#2

三、流的取消、流的取消检测

  • 流构建器对每个发射执行附加的ensureActive检测以进行取消,这意味着从flow{...}发出的繁忙循环是可以取消的。
  • 出于性能原因,大多数流操作不会执行其他取消检测,在协程处于繁忙的循环情况下,必须明确检测是否可以取消(通过cancellable操作符执行此操作)。
//如果不使用cancellable操作符 取消会失败,因为协程比较繁忙。
fun testCancel() = runBlocking<Unit> {

        flow<Int> {
            for (i in 1..5) {
                println("emit $i")
                emit(i)
            }
        }.cancellable()
            .collect {
                println(it)
                if (it == 3) {
                    cancel()
                }
            }

    }
//print result 
//emit 1
//1
//emit 2
//2
//emit 3

四、背压

水流受到与流动方向一致的压力称为背压。—>(背压) =>(水流)
在生产者与消费者模型(Flow)中,生产者的生产速度大于消费者的的消费速度就会产生背压。

  • buffer(),实现并发运行流中发射元素的代码。
  • conflate(),合并发射项,不对每个值进行处理。
  • collectLatest(),取消并重新发射最后一个值。
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了同样的缓冲机制(buffer),但是buffer函数显示的请求缓冲而不改变执行上下文。
fun testBuffer() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }
                .buffer(10)
                .collect {
                    delay(300)
                    println("Collected ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//print result 
//...
//total duration 1811
//总时间少于2000ms

//conflate()处理最新值
fun testConflate() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..5) {
                    delay(100)
                    println("emit  $i ")
                    emit(i)
                }
            }
                .conflate()
                .collect {
                    delay(300)
                    println("Collected  $it in ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//emit  1 
//emit  2 
//emit  3 
//Collected  1 in main @coroutine#1
//emit  4 
//emit  5 
//Collected  3 in main @coroutine#1
//Collected  5 in main @coroutine#1
//total duration 1249
//collectLatest 收集最后一个值
fun testCollectLatest() = runBlocking<Unit> {
        val duration = measureTimeMillis {
            flow<Int> {
                for (i in 1..4) {
                    delay(100)
                    println("emit  $i ")
                    emit(i)
                }
            }
                .collectLatest {
                    delay(300)
                    println("Collected  $it in ${Thread.currentThread().name}")
                }
        }
        println("total duration $duration")
    }
//print result 
//emit  1 
//emit  2 
//emit  3 
//emit  4 
//Collected  4 in main @coroutine#6
//total duration 940

相关文章

网友评论

      本文标题:Kotlin协程——Flow(一)

      本文链接:https://www.haomeiwen.com/subject/ijeiudtx.html