美文网首页Android 知识Kotlin 知识
Kotlin(二十)异步流-操作符<5> 流完成

Kotlin(二十)异步流-操作符<5> 流完成

作者: zcwfeng | 来源:发表于2021-08-06 00:18 被阅读0次
    1. 流完成

    当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。 你可能已经注意到,它可以通过两种方式完成:命令式或声明式。
    命令式 finally 块

    fun simple(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        try {
            simple().collect { value -> println(value) }
        } finally {
            println("Done")
        }
    }  
    
    1
    2
    3
    Done
    

    除了 try/catch 之外,收集器还能使用 finally 块在 collect 完成时执行一个动作。

    声明式处理

    [onCompletion]的主要优点是其 lambda 表达式的可空参数 Throwable 可以用于确定流收集是正常完成还是有异常发生。在下面的示例中 simple 流在发射数字 1 之后抛出了一个异常

    fun simple(): Flow<Int> = flow {
        emit(1)
        throw RuntimeException()
    }
    
    fun main() = runBlocking<Unit> {
        simple()
            .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
            .catch { cause -> println("Caught exception") }
            .collect { value -> println(value) }
    }    
    
    1
    Flow completed exceptionally
    Caught exception
    

    completed 操作符与 catch 不同,它不处理异常。我们可以看到前面的示例代码,异常仍然流向下游。它将被提供给后面的 onCompletion 操作符,并可以由 catch 操作符处理。

    成功完成

    与 [catch] 操作符的另一个不同点是 [onCompletion]能观察到所有异常并且仅在上游流成功完成(没有取消或失败)的情况下接收一个 null 异常。

    fun simple(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        simple()
            .onCompletion { cause -> println("Flow completed with $cause") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
    }
    
    我们可以看到完成时 cause 不为空,因为流由于下游异常而中止:
    
    1
    Flow completed with java.lang.IllegalStateException: Collected 2
    Exception in thread "main" java.lang.IllegalStateException: Collected 2
     at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:135) 
     at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect (SafeCollector.common.kt:115) 
     at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect (SafeCollector.common.kt:114) 
    
    

    命令式还是声明式

    现在我们知道如何收集流,并以命令式与声明式的方式处理其完成及异常情况。 这里有一个很自然的问题是,哪种方式应该是首选的?为什么? 作为一个库,我们不主张采用任何特定的方式,并且相信这两种选择都是有效的, 应该根据自己的喜好与代码风格进行选择。

    1. 启动流

    使用流表示来自一些源的异步事件是很简单的。 在这个案例中,我们需要一个类似 addEventListener 的函数,该函数注册一段响应的代码处理即将到来的事件,并继续进行进一步的处理。onEach 操作符可以担任该角色。 然而,onEach 是一个过渡操作符。我们也需要一个末端操作符来收集流。 否则仅调用 onEach 是无效的。

    如果我们在 onEach 之后使用 collect 末端操作符,那么后面的代码会一直等待直至流被收集:

    // 模仿事件流
    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- 等待流收集
        println("Done")
    }  
    
    
    Event: 1
    Event: 2
    Event: 3
    Done
    

    末端操作符可以在这里派上用场。使用 launchIn 替换 collect 我们可以在单独的协程中启动流的收集,这样就可以立即继续进一步执行代码:

    un main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .launchIn(this) // <--- 在单独的协程中执行流
        println("Done")
    }     
    
    Done
    Event: 1
    Event: 2
    Event: 3
    

    launchIn 必要的参数 CoroutineScope 指定了用哪一个协程来启动流的收集。在先前的示例中这个作用域来自 runBlocking 协程构建器,在这个流运行的时候,runBlocking 作用域等待它的子协程执行完毕并防止 main 函数返回并终止此示例。

    在实际的应用中,作用域来自于一个寿命有限的实体。在该实体的寿命终止后,相应的作用域就会被取消,即取消相应流的收集。这种成对的 onEach { ... }.launchIn(scope) 工作方式就像 addEventListener 一样。而且,这不需要相应的 removeEventListener 函数, 因为取消与结构化并发可以达成这个目的。

    注意,launchIn 也会返回一个 Job,可以在不取消整个作用域的情况下仅取消相应的流收集或对其进行 join

    1. 流取消检测

    为方便起见,[流]构建器对每个发射值执行附加的 [ensureActive]检测以进行取消。 这意味着从 flow { ... } 发出的繁忙循环是可以取消的:

    fun foo(): Flow<Int> = flow { 
        for (i in 1..5) {
            println("Emitting $i") 
            emit(i) 
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> 
            if (value == 3) cancel()  
            println(value)
        } 
    }
    
    仅得到不超过 3 的数字,在尝试发出 4 之后抛出 
    
    Emitting 1
    1
    Emitting 2
    2
    Emitting 3
    3
    Emitting 4
    Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
     at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1578) 
     at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:287) 
     at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:285) 
    

    参考: http://www.kotlincn.net/docs/reference/coroutines/flow.html#%E5%BC%82%E6%AD%A5%E6%B5%81

    相关文章

      网友评论

        本文标题:Kotlin(二十)异步流-操作符<5> 流完成

        本文链接:https://www.haomeiwen.com/subject/eyzuvltx.html