美文网首页Kotlin
[译] Kotlin Asynchronous Flow - 异

[译] Kotlin Asynchronous Flow - 异

作者: 两三行代码 | 来源:发表于2019-10-15 23:12 被阅读0次

     异步挂起函数能够返回单一值,那么我们如何返回多个异步计算的值呢?而这个就是Kotlin Flow需要解决地。

    Representing multiple values

     在kotlin,多个值可以由Collections表示。

    fun foo(): List<Int> = listOf(1, 2, 3)
     
    fun main() {
        foo().forEach { value -> println(value) } 
    }
    

    以上代码输出如下:

    1
    2
    3
    

    Sequences

     如果我们使用CPU消费型阻塞代码生产numbers,我们可以使用Sequences表示这些numbers。

    fun foo(): Sequence<Int> = sequence { // sequence builder
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it
            yield(i) // yield next value
        }
    }
    
    fun main() {
        foo().forEach { value -> println(value) } 
    }
    

     以上代码会每隔100ms,打印出一个数字。

    Suspending functions

     以上代码会阻塞主线程,我们可以给函数添加suspend修饰符来实现异步计算。

    suspend fun foo(): List<Int> {
        delay(1000) // pretend we are doing something asynchronous here
        return listOf(1, 2, 3)
    }
    
    fun main() = runBlocking<Unit> {
        foo().forEach { value -> println(value) } 
    }
    

     使用List<Int>返回类型意味着,我们需要一次性返回所有值。为了表示异步计算的值的流,我们可以使用Flow<Int>,就像之前使用Sequence<Int>同步计算值一样。

    fun foo(): Flow<Int> = flow { // flow builder
        for (i in 1..3) {
            delay(100) // pretend we are doing something useful here
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        // Launch a concurrent coroutine to check if the main thread is blocked
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
        }
        // Collect the flow
        foo().collect { value -> println(value) } 
    }
    

    输出如下:

    I'm not blocked 1
    1
    I'm not blocked 2
    2
    I'm not blocked 3
    3
    

    Flow和之前例子的差别:

    • Flow 类型 builder 函数称为 flow
    • flow{}内部代码是可以挂起的
    • 函数 foo()不再需要suspend修饰符
    • 使用emit函数发射值
    • 使用collect函数收集值

    我们可以使用Thread.Sleep 来替换 delay,那么当前Main Thread就会被阻塞

    Flows are cold

     Flow是和sequence一样的code stream,在flow内的代码块只有到flow开始被collected时才开始运行;

         
    fun foo(): Flow<Int> = flow { 
        println("Flow started")
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        println("Calling foo...")
        val flow = foo()
        println("Calling collect...")
        flow.collect { value -> println(value) } 
        println("Calling collect again...")
        flow.collect { value -> println(value) } 
    }
    

    输出如下:

    Calling foo...
    Calling collect...
    Flow started
    1
    2
    3
    Calling collect again...
    Flow started
    1
    2
    3
    

     这个是返回flow的foo()函数不需要被标记为suspend的关键理由.foo()函数会马上返回不会有任何等待,flow每次都是在collect调用之后才开始执行,这就是为什么我们在调用collect之后才打印出来 "Flow started"。

    Flow cancellation

     Flow坚持使用通用的协作式协程取消方式。flow底层并没有采用附加的取消点。对于取消这是完全透明的。和往常一样,如果flow是在一个挂起函数内被挂起了,那么flow collection是可以被取消的,并且在其他情况下是不能被取消的。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
            
    fun foo(): Flow<Int> = flow { 
      for (i in 1..3) {
          delay(100)          
          println("Emitting $i")
          emit(i)
      }
    }
    
    fun main() = runBlocking<Unit> {
      withTimeoutOrNull(250) { // Timeout after 250ms 
          foo().collect { value -> println(value) } 
      }
      println("Done")
    }
    

    输出如下:

    Emitting 1
    1
    Emitting 2
    2
    Done
    

    Flow builders

     在上述例子中,flow { }构造者是最简单的。以下还有更简单的实现:

    • flowOf定义了一个flow用来emit一个固定的集合;
    • 各种collections和sequences都可以通过asFlow()函数来转换为flows;
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> {
        // Convert an integer range to a flow
        (1..3).asFlow().collect { value -> println(value) } 
    }
    

    Intermediate flow operators

     Flows可以通过操作符来进行转换,就如同你使用collections和sequences一样。中间操作符用来应用于上游flow任何产生下游flow。这些操作符都是冷启动的,就像flows一样。对于这些操作符的调用也都不是挂起函数。它工作很快,返回新的转换的flow的定义。
     基础操作符也有着和map和filter类似的名字。和sequences一个很重要的差别是在这些操作符内部可以调用挂起函数。
     举个例子,一个请求flow可以通过map操作符映射为result,即便这个请求是在一个挂起函数内需要长时间的运行。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
              
    suspend fun performRequest(request: Int): String {
        delay(1000) // imitate long-running asynchronous work
        return "response $request"
    }
    
    fun main() = runBlocking<Unit> {
        (1..3).asFlow() // a flow of requests
            .map { request -> performRequest(request) }
            .collect { response -> println(response) }
    }
    

    输出如下:

    response 1
    response 2
    response 3
    

    Transform operator

     在flow转换操作符中,最经常使用的就是transform,它可以用来模拟简单转换,就和 mapfilter一样,也可以用来实现更加复杂地转换。可以每次emit任意数量地任意值。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    suspend fun performRequest(request: Int): String {
        delay(1000) // imitate long-running asynchronous work
        return "response $request"
    }
    
    fun main() = runBlocking<Unit> {
        (1..3).asFlow() // a flow of requests
            .transform { request ->
                emit("Making request $request") 
                emit(performRequest(request)) 
            }
            .collect { response -> println(response) }
    }
    
    Making request 1
    response 1
    Making request 2
    response 2
    Making request 3
    response 3
    

    Size-limiting operators

     Size-limiting 中间操作符会比如take会取消flow继续执行,当设置地limit已经达到了设定值。协程取消总是会抛出一个异常,所以所有的资源管理函数对于取消操作都会添加比如try{}finally{}代码块。

    import kotlinx.coroutines.flow.*
    
    fun numbers(): Flow<Int> = flow {
        try {                          
            emit(1)
            emit(2) 
            println("This line will not execute")
            emit(3)    
        } finally {
            println("Finally in numbers")
        }
    }
    
    fun main() = runBlocking<Unit> {
        numbers() 
            .take(2) // take only the first two
            .collect { value -> println(value) }
    }
    
    1
    2
    Finally in numbers
    

    Terminal flow operators

     对于flows的末端操作符都是开始flow采集的挂起函数。collect操作符是最基础的,除此以外还有其他操作符:

    • toListtoSet.可以作集合转换
    • 操作符可以获取第一个值并且确保flow只会emit一个值
    • 使用reducefold 把flow 简化合并为一个值
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> {
    
       val sum = (1..5).asFlow()
           .map { it * it } // squares of numbers from 1 to 5                           
           .reduce { a, b -> a + b } // sum them (terminal operator)
       println(sum)     
    }
    
    55
    

    Flows are sequential

     每一个flow集合都是顺序执行,除非应用了某个特定地针对多个flow执行地操作符。每一个值都是经过中间操作符,从上游到达下游,最终到达末端操作符。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> {
    
        (1..5).asFlow()
            .filter {
                println("Filter $it")
                it % 2 == 0              
            }              
            .map { 
                println("Map $it")
                "string $it"
            }.collect { 
                println("Collect $it")
            }                      
    }
    
    Filter 1
    Filter 2
    Map 2
    Collect string 2
    Filter 3
    Filter 4
    Map 4
    Collect string 4
    Filter 5
    

    Flow context

     flow collection总是会在调用者协程发生。举个例子,如果有一个foo flow,那么以下代码会在作者指定地上下文中执行,而不用去看foo flow的具体执行细节。

    withContext(context) {
        foo.collect { value ->
            println(value) // run in the specified context 
        }
    }
    

     因此,flow{}内的代码是运行在collector指定的上下文中。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
               
    fun foo(): Flow<Int> = flow {
        log("Started foo flow")
        for (i in 1..3) {
            emit(i)
        }
    }  
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> log("Collected $value") } 
    }      
    
    [main @coroutine#1] Started foo flow
    [main @coroutine#1] Collected 1
    [main @coroutine#1] Collected 2
    [main @coroutine#1] Collected 3
    

     因为foo().collect是在主线程中被调用的。foo的flow内部代码也是在主线程中执行。这是个完美地实现,解决快速运行或者异步代码,不用关心执行环境和阻塞调用者线程。

    Wrong emission withContext

     然而,长时间运行CPU消费型任务需要在Dispatchers.Default中执行,UI更新代码需要在Dispatchers.Main中执行。通常,withContext用来切换kotlin 协程的上下文。但是flow{}内部的代码必须要尊重上下文保留属性,并且不允许从不同的上下文emit值。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
                         
    fun foo(): Flow<Int> = flow {
       // The WRONG way to change context for CPU-consuming code in flow builder
       kotlinx.coroutines.withContext(Dispatchers.Default) {
           for (i in 1..3) {
               Thread.sleep(100) // pretend we are computing it in CPU-consuming way
               emit(i) // emit next value
           }
       }
    }
    
    fun main() = runBlocking<Unit> {
       foo().collect { value -> println(value) } 
    }   
    
    Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
            Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
            but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
            Please refer to 'flow' documentation or use 'flowOn' instead
        at ...
    

    flowOn operator

     以下展示正确切换flow上下文的方式:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
               
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
    
    fun main() = runBlocking<Unit> {
        foo().collect { value ->
            log("Collected $value") 
        } 
    } 
    
    [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
    [main @coroutine#1] Collected 1
    [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
    [main @coroutine#1] Collected 2
    [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
    [main @coroutine#1] Collected 3
    

    flowOn改变了flow本身的顺序执行上下文。collection发生在协程"coroutine#1"中,emission发生在协程"coroutine#2"并且是运行咋另一个线程中,和collect操作是同时进行地。当必须要为上下文改变CoroutineDispatcher时,flowOn操作符就会为上游flow创建一个新协程。

    Buffering

     在不同协程中运行flow的不同部分,在整体立场上是非常有帮助的,特别是涉及到长时间运行的异步操作。举个例子,当foo()的flow的emission操作比较慢,比如没100ms生产一个一个element,并且collector也比较慢,花费300ms去处理一个元素。让我门看看一个处理三个数字的flow需要多少时间:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlin.system.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            foo().collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
        }   
        println("Collected in $time ms")
    }
    

    输出如下,整个collection操作需要花费大概1200ms。

    1
    2
    3
    Collected in 1220 ms
    

     我们可以使用 buffer操作符去并发执行foo()方法里的emit代码段,然后顺序执行collection操作。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlin.system.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            foo()
                .buffer() // buffer emissions, don't wait
                .collect { value -> 
                    delay(300) // pretend we are processing it for 300 ms
                    println(value) 
                } 
        }   
        println("Collected in $time ms")
    }
    

     上述方式会更快生产numbers,因为我们有效创建了处理流程,只需要在第一个数字上等待100ms,然后每个数字都花费300ms去做处理。这样方式会花费大概1000ms。

    1
    2
    3
    Collected in 1071 ms
    

    注意: flowOn操作符使用了相同的缓存机制,但是它必须切换CoroutineDispatcher,但是在这里,我们显示请求了buffer而不是切换执行上下文。

    Conflation

     当一个flow表示操作的部分结果或者操作状态更新,它可能并不需要去处理每一个值,但是需要处理最近的一个值。在这种场景下, conflate操作符可以被用于忽略中间操作符,当collector处理太慢。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlin.system.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            foo()
                .conflate() // conflate emissions, don't process each one
                .collect { value -> 
                    delay(300) // pretend we are processing it for 300 ms
                    println(value) 
                } 
        }   
        println("Collected in $time ms")
    }
    

     从上面可以看出来,第一个数字会被处理,第二第三个也会被处理,而第二个数字会被合并,只有第三个数字发送给collector进行处理。

    1
    3
    Collected in 758 ms
    

    Processing the latest value

      Conflation是一种对emitter和collector慢处理的一种加速方式。它通过丢弃一些值来做实现。另外一种方式就是通过取消一个慢处理collector然后重启collector接受已经发射的新值。有一族的xxxlatest操作符来执行和xxx操作符同样的基本逻辑。取消emit新值的代码块内的代码。我们把上个例子中的conflate改成collectlatest。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlin.system.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            foo()
                .collectLatest { value -> // cancel & restart on the latest value
                    println("Collecting $value") 
                    delay(300) // pretend we are processing it for 300 ms
                    println("Done $value") 
                } 
        }   
        println("Collected in $time ms")
    }
    

     因为collectLatest花费来300ms,每次发送一个新值都是100ms。我们看见代码块都是运行在新值上,但是只会以最新值完成。

    Collecting 1
    Collecting 2
    Collecting 3
    Done 3
    Collected in 741 ms
    

    Composing multiple flows

    有不少方式来组合多个flow。

     就像kotlin标准库内的Sequence.zip扩展函数一样,flows有zip操作符来组合不同的flows的值。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> { 
    
        val nums = (1..3).asFlow() // numbers 1..3
        val strs = flowOf("one", "two", "three") // strings 
        nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
            .collect { println(it) } // collect and print
    }
    
    1 -> one
    2 -> two
    3 -> three
    

    Combine

     当flow表示操作或者变量的最近值,它可能需要去执行计算根据所依赖的相应的flow的最近的值,并且会重新计算,当上游flow发射新值的时候。相应的操作符族被称作combine

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> { 
    
        val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
        val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
        val startTime = System.currentTimeMillis() // remember the start time 
        nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    

    使用combine替换zip

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun main() = runBlocking<Unit> { 
    
        val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
        val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
        val startTime = System.currentTimeMillis() // remember the start time 
        nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    
    1 -> one at 452 ms from start
    2 -> one at 651 ms from start
    2 -> two at 854 ms from start
    3 -> two at 952 ms from start
    3 -> three at 1256 ms from start
    

    Flattening flows

     Flows表示异步值接收序列,它会很容易地触发请求另一个sequence值。比如,以下例子返回了两个500ms间隔字符串地flow。

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    

     现在我们有三个整型的flow,我们调用requestFlow来请求值。

    (1..3).asFlow().map { requestFlow(it) }
    

     现在我们结束flows的每一个flow,需要将其扁平化为一个flow来进行进一步地处理。Collections和sequences有 flattenflatMap操作符,由于flow地异步性质,它会调用不同地flattening地模型,因此,flows有一族的flattening操作符。

    flatMapConcat

     连接模式由flatMapConcatflattenConcat操作符实现。 它们和sequence是最相似的。在收集新值之前,它们会等待内部flow完成,如同下面的例子描述地一样:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapConcat { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    

    可以看出来 flatMapConcat的顺序性质

    1: First at 121 ms from start
    1: Second at 622 ms from start
    2: First at 727 ms from start
    2: Second at 1227 ms from start
    3: First at 1328 ms from start
    3: Second at 1829 ms from start
    

    flatMapMerge

    另一个flattening mode是并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理。这些由flatMapMergeflattenMerge来实现。它们都有一个可选的concurrency参数来限制并发同时进行收集的flow个数。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapMerge { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    

    可以看出来flatMapMerge并发特性:

    1: First at 136 ms from start
    2: First at 231 ms from start
    3: First at 333 ms from start
    1: Second at 639 ms from start
    2: Second at 732 ms from start
    3: Second at 833 ms from start
    

    可以看出来flatMapMerge是顺序调用内部代码块(在这个例子中是{ requestFlow(it) } )但是并发收集结果flows,它等价于顺序执行map { requestFlow(it) },然后对于结果调用flattenMerge

    flatMapLatest

     flatMapLatest和collectLatest操作符很像,在"Processing the latest value"章节有介绍,里面有关于"Latest"模式的介绍,只有新flow发射了了新值,那么上个flow就会被取消,由 flatMapLatest实现。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapLatest { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    
    1: First at 142 ms from start
    2: First at 322 ms from start
    3: First at 425 ms from start
    3: Second at 931 ms from start
    

    flatMapLatest取消了代码快内所有代码(在这个例子中是{ requestFlow(it) })当flow发射新值。在这个特定例子中没有差别,因为对requestFlow的调用是非常快的,非挂起也不能取消。如果我们使用挂起函数,比如delay就会按照期望的来显示。

    Flow exceptions

     如果在emitter内部或者操作符内部抛出一个异常,flow collection也是可以正常完成。也有好几种处理异常的方式。

    Collector try and catch

     在collector内部使用try/catch代码块来处理异常。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        try {
            foo().collect { value ->         
                println(value)
                check(value <= 1) { "Collected $value" }
            }
        } catch (e: Throwable) {
            println("Caught $e")
        } 
    }    
    

     这段代码成功捕捉了 collect 末端操作符内的异常,在抛出异常之后就没有再发送新值。

    Emitting 1
    1
    Emitting 2
    2
    Caught java.lang.IllegalStateException: Collected 2
    

    Everything is caught

     上个例子确实捕捉了emitter,中间操作符,末端操作符内的异常。我们修改以下代码,将emitter发射值通过mapped修改为strings,但是除了异常之外。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<String> = 
        flow {
            for (i in 1..3) {
                println("Emitting $i")
                emit(i) // emit next value
            }
        }
        .map { value ->
            check(value <= 1) { "Crashed on $value" }                 
            "string $value"
        }
    
    fun main() = runBlocking<Unit> {
        try {
            foo().collect { value -> println(value) }
        } catch (e: Throwable) {
            println("Caught $e")
        } 
    }            
    
    Emitting 1
    string 1
    Emitting 2
    Caught java.lang.IllegalStateException: Crashed on 2
    

    Exception transparency

     但是怎么样才能将封装处理emitter代码异常处理逻辑呢?
     Flow必须对异常处理透明,在flow{}内使用try/catch违背了异常处理透明化原则。在上述例子中,保证了从collector抛出异常也能在try/catch内捕获。
     emitter可以使用catch操作符来实现异常透明化处理,运行异常处理封装。catch操作符可以分析异常,根据不同的异常作出相应处理。

    • 可以使用throw再次抛出异常
    • 异常可以在catch内转换为发射值
    • 异常可以被忽略、打印、或者由其他逻辑代码进行处理
       举个例子,我们在catch异常之后再发射一段文本:
    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<String> = 
        flow {
            for (i in 1..3) {
                println("Emitting $i")
                emit(i) // emit next value
            }
        }
        .map { value ->
            check(value <= 1) { "Crashed on $value" }                 
            "string $value"
        }
    
    fun main() = runBlocking<Unit> {
        foo()
            .catch { e -> emit("Caught $e") } // emit on exception
            .collect { value -> println(value) }
    }
    

    Transparent catch

     catch中间操作符,履行了异常透明化原则,只是捕捉上游异常(仅仅会捕获catch操作符以上的异常,而不会捕获catch以下的异常),如果是在collect{}代码块内的异常则会逃逸。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo()
            .catch { e -> println("Caught $e") } // does not catch downstream exceptions
            .collect { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
    } 
    

    Catching declaratively

     我们可以结合 catch操作符属性然后如果要求去处理所有异常的话就可以把 collect函数体内的逻辑转移到onEach并且放到catch操作符前面去。这样的Flow Collection 必须由对collect的调用触发并且collect方法没有参数。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo()
            .onEach { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
            .catch { e -> println("Caught $e") }
            .collect()
    }
    

     现在我们可以看见"Caught …"消息打印出来,现在就可以捕获所有异常而不用显示编写try/catch代码块。

    Flow completion

     在flow collection完成之后(正常完成或者异常完成)可能需要执行一个操作,可以通过两种方式来完成: imperative 或 declarative。

    Imperative finally block

     对于try/catch,collector也可以通过使用finally来执行完成操作。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        try {
            foo().collect { value -> println(value) }
        } finally {
            println("Done")
        }
    }  
    
    1
    2
    3
    Done
    

    Declarative handling

     对于declarative方式,flow有一个onCompletion中间操作符,在flow完成collect操作后就会被调用。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        foo()
            .onCompletion { println("Done") }
            .collect { value -> println(value) }
    }
    

     对于 onCompletion的关键性优点则是可以为空的Throwable参数,可以以此判断flow是正常完成还是异常完成,在以下例子中,foo() flow在emit数字1之后抛出来异常。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = flow {
        emit(1)
        throw RuntimeException()
    }
    
    fun main() = runBlocking<Unit> {
        foo()
            .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
            .catch { cause -> println("Caught exception") }
            .collect { value -> println(value) }
    } 
    

    输入如下

    1
    Flow completed exceptionally
    Caught exception
    

    onCompletion并不像catch一样,它不会处理异常。正如我们上面看见的一样,异常依然向下游流动,被进一步分发到onCompletion操作符,也可以在catch操作符内做处理。

    Upstream exceptions only

     和catch操作符一样,onCompletion只会看见来自上游的异常,不能看见下游异常。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    fun foo(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        foo()
            .onCompletion { cause -> println("Flow completed with $cause") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
    }
    
    1
    Flow completed with null
    Exception in thread "main" java.lang.IllegalStateException: Collected 2
    

    Imperative versus declarative

     现在我们知道如何去收集flow,处理completion和exceptions以imperative和declarative方式。

    Launching flow

     在一些数据源,使用flows来表示异步事件是非常简单地。我们需要和addEventListener一样的类似处理方式,注册一段代码用来处理新事件和进行进一步的工作。onEach正好能服务这一角色。然而,onEach是一个中间操作符。我们也需要一个末端操作符来收集操作符。其他情况下调用onEach则毫无影响。
     如果我们在 collect末端操作符之前使用onEach,在那之后的代码将会等待直到flow收集完成。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    // Imitate a flow of events
    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- Collecting the flow waits
        println("Done")
    } 
    
    Event: 1
    Event: 2
    Event: 3
    Done
    

    launchIn操作符是很方便地。使用collect来替换launchIn,我们就可以在一个独立协程中执行flow采集,所以后面的代码就会马上执行。

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
    // Imitate a flow of events
    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .launchIn(this) // <--- Launching the flow in a separate coroutine
        println("Done")
    }   
    
    Done
    Event: 1
    Event: 2
    Event: 3
    

      launchIn参数可以指定一个 CoroutineScope,用来表示flow收集的协程所在的作用域。在上述例子中,这个作用域来自于 runBlocking协程构造器,runBlocking作用域会等待内部子作用域完成,在这例子中保持主函数的返回和介绍。
     在实际应用程序中,一个作用域会来自一个有限生命周期的实体。只要这个实体的生命周期终结了,那么相应的作用域也会被取消,取消相应flow的采集工作。在这种方式下,使用onEach { ... }.launchIn(scope)和addEventListener就很类似,然而,没有必要直接调用removeEventListener,因为取消操作和结构化并发会自动完成这个操作。
     请注意,launchIn会返回 Job,在没有取消整个作用域或join时,job可以用来cancel相应的flow收集协程。

    相关文章

      网友评论

        本文标题:[译] Kotlin Asynchronous Flow - 异

        本文链接:https://www.haomeiwen.com/subject/qsujmctx.html