美文网首页
kotlin Flow异步流

kotlin Flow异步流

作者: Bfmall | 来源:发表于2023-08-11 07:43 被阅读0次

    Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
    流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
    从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

    flow构建器创建一个函数
    返回多个值,而且是异步的,不是一次性返回

    (1)构建流的三种方式

    // flow构建器创建一个函数
    // 返回多个值,而且是异步的,不是一次性返回
    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            delay(1000)
            emit(i) // 发射,产生一个元素
        }
    }
    
    runBlocking {
        // Flow构建方式1
        simpleFlow().collect { value -> println(value) } // 收集元素
    
        // Flow构建方式2
        (1..5).asFlow().filter {
            it % 2 == 0
        }.map {
            println("Map $it")
        }.onEach {
            delay(1000)
        }.collect {
            println("Collect $it")
        }
    
        // Flow构建方式3
        flowOf("one", "two", "three").onEach { delay(1000) }.collect { values ->
            println(values)
        }
    }
    

    (2)流的上下文

        // Flow上下文验证
        (1..5).asFlow().filter {
            println("当前线程-filter:" + Thread.currentThread().name)
            it % 2 == 0
        }.map {
            println("当前线程-map:" + Thread.currentThread().name)
        }.onEach {
            delay(1000)
        }.collect {
            println("当前线程-collect:" + Thread.currentThread().name)
            println("Collect $it")
        }
    

    从打印结果上看,上游和下游都是在主线程。
    但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default),改造后的代码如下:

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            delay(1000)
            emit(i) // 发射,产生一个元素
        }
    }.flowOn(Dispatchers.Default)
    
    fun main() {
        runBlocking {
            // Flow构建方式1
            simpleFlow().collect { value -> println(value) } // 收集元素
    
            // Flow构建方式2
            (1..5).asFlow().filter {
                println("当前线程-filter:" + Thread.currentThread().name)
                it % 2 == 0
            }.map {
                println("当前线程-map:" + Thread.currentThread().name)
            }.onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).collect {
                println("当前线程-collect:" + Thread.currentThread().name)
                println("Collect $it")
            }
    
            // Flow构建方式3
            flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
                println(values)
            }
    
        }
    }
    

    (3)启动流

    启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集

        (1..5).asFlow().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
    
    
        (1..5).asFlow().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).launchIn(this).join()
    

    (4)流的取消

    使用 withTimeoutOrNull 方式取消:

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            delay(1000)
            emit(i) // 发射,产生一个元素
        }
    }.flowOn(Dispatchers.Default)
    
    fun main() {
        runBlocking {
    
            withTimeoutOrNull(2000) {
                // Flow构建方式1
                simpleFlow().collect { value -> println(value) } // 收集元素
            }
    
            withTimeoutOrNull(2000) {
                (1..5).asFlow().onEach {
                    delay(1000)
                }.flowOn(Dispatchers.Default).collect {
                    println("Collect $it")
                }
            }
    
            withTimeoutOrNull(2000) {
                flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
                    println(values)
                }
            }
    
            withTimeoutOrNull(2000) {
                (1..5).asFlow().onEach {
                    delay(1000)
                }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
            }
    
            println("Done...")
    
        }
    }
    

    另外,启动流还可以调用 cancelAndJoin 取消。

        val job = (1..5).asFlow().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
        delay(1000)
        job.cancelAndJoin()
    

    (5)流的取消检测

    为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
    出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
    通过cancellable操作符来执行此操作。

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..5) {
            delay(1000)
            emit(i) // emit自带检测是否取消的能力
        }
    }.flowOn(Dispatchers.Default)
    
    fun main() {
        runBlocking {
    
            // emit 自带检测是否取消的能力
            simpleFlow().collect { value ->
                if (value == 3) cancel()
            }
    
            // 如果没有emit,需要使用 cancellable
            (1..5).asFlow().cancellable().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).collect { value ->
                if (value == 3) cancel()
            }
    
        }
    }
    

    (6)背压

    背压:水流受到与流动方向一致的压力。
    生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。

    处理背压的方式有:
    buffer(),并发运行流中发射元素的代码
    conflate(),合并发射项,不对每个值进行处理
    collectLatest(),取消并重新发射最后一个值
    当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显示地请求缓冲而不改变执行上下文。

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..50) {
            println("发送数据:$i")
            delay(100)
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            val time = measureTimeMillis {
                simpleFlow()
                    .collect { value ->
                    delay(300)
                    println("接收数据:$value")
                }
            }
            println("耗时:$time")
        }
    }
    

    以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。

    为了增加执行效率,可以使用 buffer 设置缓存大小,从而起到加快执行速率的效果。

        val time = measureTimeMillis {
            // 背压
            simpleFlow()
                .buffer(10)
                .collect { value ->
                delay(300)
                println("接收数据:$value")
            }
        }
    

    但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。

        val time = measureTimeMillis {
            // 背压
            simpleFlow()
                .flowOn(Dispatchers.Default)
                .collect { value ->
                delay(300)
                println("接收数据:$value")
            }
        }
    

    使用 flowOn 可以指定 Flow 的协程作用域,这样可以将 并行 转成 并发,从而加快执行效率。

    runBlocking {
        val time = measureTimeMillis {
            // 背压
            simpleFlow()
                .conflate()
                .collect { value ->
                delay(300)
                println("接收数据==:$value")
            }
        }
        println("耗时:$time")
    }
    

    以上代码使用 conflate,中间一些元素不会处理,从而加快执行效率。

        val time = measureTimeMillis {
            // 背压
            simpleFlow()
                .collectLatest { value ->
                delay(300)
                println("接收数据==:$value")
            }
    

    以上代码将 collect 改成 collectLatest 之后,只会处理最后一个值,从而加速执行速度。

    (7)转换操作符

    使用map转换:

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            println(i)
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            simpleFlow()
                .map { value ->
                    "response $value"
                }
                .collect { value ->
                    println(value)
                }
        }
    }
    

    使用transform转换,可以转换成任意次、任意值的Flow:

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            println(i)
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            simpleFlow()
                .transform { request ->
                    emit("request $request")
                    emit("request $request")
                }
                .collect { value ->
                    println(value)
                }
        }
    }
    

    (8)限长操作符

    take 是限长操作符,可以限制处理的数量:

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            println(i)
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            simpleFlow()
                .take(2)
                .collect { value ->
                    println(value)
                }
        }
    }
    

    (9)末端操作符

    末端操作符是在流上用于 启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:

    转化为各种集合,例如:toList与toSet。
    获取第一个(first)值与确保流发射单个(single)值的操作符。
    使用reduce与fold将流规约到单个值。

    fun main() {
        runBlocking {
            val sum = simpleFlow()
                .reduce { a, b ->
                    a + b
                }
            println(sum)
        }
    }
    

    reduce 操作符可以将元素累加。
    reduce的返回值类型必须和集合的元素类型相符。

    suspend fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            val newStr = simpleFlow()
                .fold(StringBuilder()) { str: StringBuilder, a: Int ->
                    str.append(a).append(" ")
                }
            println(newStr)
        }
    }
    

    而fold的返回值类型则不受约束。

    (10)组合操作符

    zip 操作符将两个流合并。

    runBlocking {
        val nums1 = (1..3).asFlow()
        val nums2 = flowOf("one", "two", "three")
        nums1.zip(nums2) {a, b ->
            "$a $b"
        }.collect {value->
            println(value)
        }
    }
    

    (11)展平操作符

    流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:

    flatMapConcat:连接模式
    flatMapMerge:合并模式
    flatMapLatest: 最新展平模式

    suspend fun requestFlow(i: Int) = flow<String> {
        emit("request $i first")
        delay(500)
        emit("request $i second")
    }
    
    fun main() {
        runBlocking {
            val startTime = System.currentTimeMillis()
            (1..3).asFlow()
                .onEach { delay(100) }
                .flatMapConcat {
                    requestFlow(it) // Flow的元素是Flow
                }
                .collect { value->
                println("$value -- ${System.currentTimeMillis() - startTime}")
            }
        }
    }
    

    代码中 flatMapConcat 可以换成 flatMapMerge 或者 flatMapLatest。

    三者的执行结果是:

    flatMapConcat :(requestFlow全部执行完)

    request 1 first -- 198
    request 1 second -- 701
    request 2 first -- 815
    request 2 second -- 1319
    request 3 first -- 1428
    request 3 second -- 1932
    

    flatMapMerge:(不需要等待requestFlow全部执行完)

    request 1 first -- 281
    request 2 first -- 361
    request 3 first -- 470
    request 1 second -- 798
    request 2 second -- 876
    request 3 second -- 985
    

    flatMapLatest:

    request 1 first -- 250
    request 2 first -- 376
    request 3 first -- 485
    request 3 second -- 1001
    

    (12)流的异常处理

    suspend fun requestFlow() = flow<Int> {
        for (i in 1..3) {
            emit(i)
            throw RuntimeException("exception")
        }
    }.catch {e: Throwable ->
        println("上游异常捕获:" + e.message)
    }
    
    fun main() {
        runBlocking {
            try {
                requestFlow()
                    .collect { value->
                        check(value < 2) // 检查异常
                        println(value)
                    }
            } catch (e: Throwable) {
                println("下游异常捕获:" + e.message)
            }
        }
    }
    

    check:检查异常,一旦检查到异常,程序crash。
    下游通过 try...catch 捕获异常,上游Flow自带 catch 函数。

    (13)流的完成

    收集完成时,使用 finally,表示收集完成。

    suspend fun requestFlow() = flow<Int> {
        for (i in 1..3) {
            emit(i)
        }
    }
    
    fun main() {
        runBlocking {
            try {
                requestFlow().collect { value-> println(value) }
            } finally {
                println("...完成...")
            }
        }
    }
    

    使用 onCompletion 也可以表示完成:

    suspend fun requestFlow() = flow<Int> {
        for (i in 1..3) {
            emit(i)
            throw RuntimeException("exception")
        }
    }.catch {exception->
        println("catch -> exception:" + exception.message)
    }
    
    fun main() {
        runBlocking {
    
            requestFlow()
                .onCompletion {exception ->
                    if (exception != null) { // 异常导致完成
                        println("finish -> exception:" + exception.message)
                    } else { // 正常结束
                        println("正常结束")
                    }
                }
                .collect { value-> println(value) }
    
        }
    }
    

    onCompletion 可以拿到异常信息,但是不能捕获异常。

    (14)Flow实现多路复用

    多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。

    data class User(val name: String)
    data class Response<T>(val value: T, val isLocal: Boolean)
    suspend fun CoroutineScope.getUserForLocal(name: String) = async {
        delay(1000)
        User(name)
    }
    
    suspend fun CoroutineScope.getUserFromRemote(name: String) = async {
        delay(100)
        User(name)
    }
    
    fun main() {
        runBlocking {
            val name = "guest"
            // 两个函数
            listOf(::getUserForLocal, ::getUserFromRemote)
                .map { function->
                    function.call(name)
                }
                .map { deferred ->
                    flow { emit(deferred.await()) }
                }.merge().collect { user -> println(user) }
    
        }
    }
    

    以上代码用到了反射,需要引入依赖:

    implementation 'org.jetbrains.kotlin:kotlin-reflect:1.0.6'
    

    作者:NoBugException
    链接:https://www.jianshu.com/p/7e5562432ab3
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

          本文标题:kotlin Flow异步流

          本文链接:https://www.haomeiwen.com/subject/hydzpdtx.html