美文网首页
Kotlin协程之Flow的使用与原理

Kotlin协程之Flow的使用与原理

作者: 长点点 | 来源:发表于2023-08-24 09:49 被阅读0次

    Flow的定义和特点

    Flow是一种数据流,可以用于协程间的通信,具有冷、懒、响应式等特点。Flow是基于协程构建的,可以提供多个值。Flow在概念上类似于一个数据序列,但它可以使用挂起函数来异步地产生和消费值。这意味着,例如,Flow可以安全地发起网络请求来产生下一个值,而不会阻塞主线程。

    Flow的特点主要有以下几点:

    Flow的创建和操作

    Flow示意图

    Flow可以通过多种方式创建,最简单的方式是使用flow{}构建器函数,在其中使用emit函数手动发射值。例如,下面的代码创建了一个发射1到3的整数值的Flow:

    // 创建一个Flow<Int>
    fun simple(): Flow<Int> = flow {
        // 发射1到3
        for (i in 1..3) {
            emit(i) // 发射下一个值
        }
    }
    
    

    复制

    除了flow{}构建器函数外,还有一些其他方式可以创建Flow,例如:

    • 使用flowOf()函数创建一个包含固定元素的Flow。
    • 使用asFlow()扩展函数将各种集合和序列转换为Flow。
    • 使用channelFlow()构建器函数创建一个基于通道(Channel)的Flow。
    • 使用callbackFlow()构建器函数创建一个基于回调(Callback)的Flow。

    创建好Flow后,可以使用各种操作符对数据进行处理。操作符分为两类:

    • 中间操作符:中间操作符用于对数据进行转换、过滤、组合等操作,但不会终止流。中间操作符返回一个新的Flow,可以链式调用多个中间操作符。例如,filter、map、take等操作符都是中间操作符。
    • 终止操作符:终止操作符用于对数据进行收集、聚合、统计等操作,并终止流。终止操作符返回一个非Flow类型的结果,并触发流的执行。例如,collect、first、toList等操作符都是终止操作符。

    例如,下面的代码使用了map和filter两个中间操作符对simple()函数返回的Flow进行了转换和过滤,并使用了collect终止操作符对结果进行了打印:

    // 对simple()返回的Flow进行处理
    fun main() = runBlocking<Unit> {
        // 启动并发协程以验证主线程并未阻塞
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
        }
        // 收集这个流
        simple()
            .map { it * it } // 数字求平方
            .filter { it % 2 == 0 } // 过滤偶数
            .collect { value -> // 终止操作符
                println(value) // 打印结果
            }
    }
    
    

    复制

    输出结果为:

    I'm not blocked 1
    4
    I'm not blocked 2
    I'm not blocked 3
    
    

    复制

    可以看到,Flow的执行是在协程中进行的,不会阻塞主线程。同时,Flow的操作符也是挂起函数,可以在其中进行异步操作,例如:

    // 对simple()返回的Flow进行处理
    fun main() = runBlocking<Unit> {
        // 收集这个流
        simple()
            .map { request(it) } // 模拟异步请求
            .collect { value -> // 终止操作符
                println(value) // 打印结果
            }
    }
    
    // 模拟异步请求,返回字符串
    suspend fun request(i: Int): String {
        delay(1000) // 延迟1秒
        return "response $i"
    }
    
    

    复制

    输出结果为:

    response 1
    response 2
    response 3
    
    

    复制

    可以看到,每个请求都延迟了1秒,但是不会阻塞主线程或其他请求。

    Flow的生命周期和异常处理

    Flow提供了一些回调函数来监听流的生命周期,例如:

    • onStart:在流开始收集之前调用,可以用于执行一些初始化操作,例如打开文件或数据库连接等。
    • onEach:在每个元素被发射之后调用,可以用于执行一些通用操作,例如日志记录或更新UI等。
    • onCompletion:在流完成收集之后调用,无论是正常完成还是异常终止。可以用于执行一些清理操作,例如关闭文件或数据库连接等。onCompletion可以接收一个可空的Throwable参数,表示流终止的原因,如果为null,则表示正常完成。

    例如,下面的代码使用了onStart和onCompletion两个回调函数来打印流的开始和结束时间:

    // 对simple()返回的Flow进行处理
    fun main() = runBlocking<Unit> {
        // 收集这个流
        simple()
            .onStart { println("Flow started at ${System.currentTimeMillis()}") } // 开始回调
            .onCompletion { println("Flow completed at ${System.currentTimeMillis()}") } // 结束回调
            .collect { value -> // 终止操作符
                println(value) // 打印结果
            }
    }
    
    

    复制

    输出结果为:

    Flow started at 1632828678656
    1
    2
    3
    Flow completed at 1632828678657
    
    

    复制

    可以看到,流的开始和结束时间都被打印出来了。

    Flow也提供了一些方式来处理异常,例如:

    • catch:在流发生异常时调用,可以用于捕获和处理异常,并决定是否继续或终止流。catch操作符必须放在可能发生异常的操作符之后,否则无法捕获异常。
    • try-catch:在收集流时使用try-catch块包裹collect操作符,可以用于捕获和处理异常,并决定是否继续或终止程序。try-catch块可以捕获任何位置发生的异常。

    例如,下面的代码使用了catch和try-catch两种方式来处理异常:

    
    // 创建一个可能发生异常的Flow<Int>
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // 发射下一个值
        }
        throw RuntimeException() // 抛出异常
    }
    
    // 对foo()返回的Flow进行处理
    fun main() = runBlocking<Unit> {
        // 使用catch操作符捕获异常,并打印错误信息,然后继续发射-1作为错误标识
        foo()
            .catch { e -> println("Caught $e") } // 捕获异常
            .emit(-1) // 发射错误标识
            .collect { value ->
                println(value)
            }
        println("Done")
    // 使用try-catch块捕获异常,并打印错误信息,然后终止程序
        try {
            foo().collect { value ->
                println(value)
            }
        } catch (e: Throwable) {
            println("Caught $e")
        }
        println("Done")
    }
    

    输出结果为:

    Emitting 1
    1
    Emitting 2
    2
    Emitting 3
    3
    Caught java.lang.RuntimeException
    -1
    Done
    Emitting 1
    1
    Emitting 2
    2
    Emitting 3
    3
    Caught java.lang.RuntimeException
    Done
    

    可以看到,catch操作符可以在流中处理异常,并继续发射值,而try-catch块可以在程序中处理异常,并终止程序。

    Flow的线程切换

    Flow提供了一些操作符来切换上游和下游的上下文,例如:

    • flowOn:flowOn操作符用于切换上游的上下文,也就是说,它会影响flow{}构建器函数和之前的中间操作符的执行上下文。flowOn操作符可以用于将耗时的计算操作放在后台线程中执行,而不影响主线程。
    • launchIn:launchIn操作符用于切换下游的上下文,也就是说,它会影响collect操作符和之后的中间操作符的执行上下文。launchIn操作符可以用于将收集操作放在协程中执行,而不阻塞当前线程。

    例如,下面的代码使用了flowOn和launchIn两个操作符来切换上下文:

    
    // 创建一个Flow<Int>
    fun simple(): Flow<Int> = flow {
        // 发射1到3,并打印当前线程名
        for (i in 1..3) {
            Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
            log("Emitting $i")
            emit(i) // 发射下一个值
        }
    }.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文
    
    // 对simple()返回的Flow进行处理
    fun main() = runBlocking<Unit> {
        // 收集这个流,并打印当前线程名
        simple()
            .collect { value ->
                log("Collected $value")
            }
        println("Done")
    
        // 使用launchIn操作符将收集操作放在协程中执行,并打印当前线程名
        simple()
            .onEach { value ->
                log("Collected $value")
            }
            .launchIn(this) // 在单独的协程中收集并打印结果
        println("Done")
    }
    

    输出结果为:

    [DefaultDispatcher-worker-1] Emitting 1
    [main] Collected 1
    [DefaultDispatcher-worker-1] Emitting 2
    [main] Collected 2
    [DefaultDispatcher-worker-1] Emitting 3
    [main] Collected 3
    Done
    Done
    [DefaultDispatcher-worker-2] Emitting 1
    [main] Collected 1
    [DefaultDispatcher-worker-2] Emitting 2
    [main] Collected 2
    [DefaultDispatcher-worker-2] Emitting 3
    [main] Collected 3
    

    可以看到,flowOn操作符将发射操作放在了DefaultDispatcher线程中执行,而collect操作仍然在主线程中执行。launchIn操作符将收集操作放在了一个单独的协程中执行,而不阻塞主线程。

    Flow示意图

    相关文章

      网友评论

          本文标题:Kotlin协程之Flow的使用与原理

          本文链接:https://www.haomeiwen.com/subject/dkpmmdtx.html