美文网首页
【Koltin Flow(三)】Flow操作符之中间操作符(三)

【Koltin Flow(三)】Flow操作符之中间操作符(三)

作者: MakerGaoGao | 来源:发表于2022-08-03 15:53 被阅读0次

    目录

    【Koltin Flow(一)】五种创建flow的方式
    【Koltin Flow(二)】Flow操作符之末端操作符
    【Koltin Flow(三)】Flow操作符之中间操作符(一)
    【Koltin Flow(三)】Flow操作符之中间操作符(二)
    【Koltin Flow(三)】Flow操作符之中间操作符(三)
    【Koltin Flow(四)】Flow背压
    【Koltin Flow(五)】SharedFlow及StateFlow

    前言

    1. 本篇主要介绍中间操作符的功能性操作符以及一些其他操作符。
    2. 如果对其他的操作符或者flow基本知识不太了解,可参考目录的其他篇内容作为参考。

    功能操作符

    1. retry、retryWhen retry为retryWhen的简化版本,可设置重试次数,以及在闭包内重试开关。

    retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。

    代码如下:
                flow<Int> {
                    if (index < 2) {
                        index++
                        throw RuntimeException("runtime exception index $index")
                    }
                    emit(100)
                }.retry(2).catch {
                    Log.e(TAG.TAG, "ex is $it")
                }.collect {
                    Log.d(TAG.TAG, "retry(2)  $it")
                }
                index = 0
                flow<Int> {
                    if (index < 2) {
                        index++
                        throw RuntimeException("runtime exception index $index")
                    }
                    emit(100)
                }.retry {
                    it is RuntimeException
                }.catch {
                    Log.e(TAG.TAG, "ex is $it")
                }.collect {
                    Log.d(TAG.TAG, "retry{}  $it")
                }
    
    
                index = 0
                flow<Int> {
                    if (index < 2) {
                        index++
                        throw RuntimeException("runtime exception index $index")
                    }
                    emit(100)
                }.retryWhen { cause, attempt ->
                    Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
                    cause is RuntimeException
                } .catch {
                    Log.e(TAG.TAG, "ex is $it")
                }.collect {
                    Log.d(TAG.TAG, "retryWhen  $it")
                }
    
    
    日志如下:
    2022-08-02 10:26:55.301 4775-4801/edu.test.demo D/Test-TAG: retry(2)  100
    2022-08-02 10:26:55.302 4775-4801/edu.test.demo D/Test-TAG: retry{}  100
    2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
    2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
    2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: retryWhen  100
    
    分析:
    • 可以看出虽然在一定条件会抛出异常,但是100这个值都提交成功了,这就是重试retry的作用。
    • retry的次数和闭包返回值可以同时设置,两个值为并列关系,如果一个不满足则不会重试,次数的默认值为Int.MAX_VALUE,闭包的返回值默认为true,所以我们也可以不设置值,直接调用retry()也可以实现重试的效果。
    • retryWhen和retry一致,闭包返回true则重试,返回false则不再重试。
    2. cancellable 设置了之后则cancel()取消了flow值得发送,但是有个特殊情况,asFlow的时候需要这个cancellable设置,但是flow{}直接创建出来的flow则不需要设置,原因在下面的分析中看源码说明。
    代码如下:

    代码1

                (1..10).asFlow().cancellable().catch {
                    Log.e(TAG.TAG,"ex is $it")
                }.collect {
                    if (it == 5){
                        cancel()
                    }
                    Log.d(TAG.TAG, " (1..10).asFlow() cancellable $it")
                }
    

    代码2

             flow {
                    repeat(10){
                        emit(it)
                    }
                }.collect {
                    if (it == 5){
                        cancel()
                    }
                    Log.d(TAG.TAG, "flow{} cancellable $it")
                }
    
    日志如下:

    日志1

    2022-08-02 11:03:27.029 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 1
    2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 2
    2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 3
    2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 4
    2022-08-02 11:03:27.035 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 5
    

    日志2

    2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 0
    2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 1
    2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 2
    2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 3
    2022-08-02 11:10:51.502 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 4
    2022-08-02 11:10:51.505 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 5
    
    分析:
    • 可以看出asFlow在设置了cancellable之后,可以正常取消,如果不设置则会打印出全部的值。
    • flow{}则不受影响,不设置cancellable也可以正常取消。
    • 原因分析如下:
    • asFlow生成的是Flow<T>,调用的方法是unsafeFlow,看名字就可以看出一些端倪,unsafe,但是点开源码如下:
    /**
     * Creates a flow that produces values from the range.
     */
    public fun IntRange.asFlow(): Flow<Int> = flow {
        forEach { value ->
            emit(value)
        }
    }
    

    不是unsafeFlow呀,直接也是flow呀,这里注意导入:

    import kotlinx.coroutines.flow.internal.unsafeFlow as flow
    

    看出来了吧,是unsafeFlow...,到这里asFlow就解释的差不多了。

    • 在看下flow的源码:
    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    
    // Named anonymous object
    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }
    

    SafeFlow,是不是从名字也可以看出一些端倪,如果不清楚在继续往下看:

    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>{
        ......
    }
    

    看到了吧,CancellableFlow,可取消的flow,再看

    /**
     * Internal marker for flows that are [cancellable].
     */
    internal interface CancellableFlow<out T> : Flow<T>
    

    是不是更清楚了,flow{},生成的本身就是CancellableFlow,到此处flow{},也解释的差不多了。

    • 另外再看下cancellable的源码:
    public fun <T> Flow<T>.cancellable(): Flow<T> =
        when (this) {
            is CancellableFlow<*> -> this // Fast-path, already cancellable
            else -> CancellableFlowImpl(this)
        }
    

    两种情况,一种本身就是CancellableFlow,直接返回自己,另外一种,则创建CancellableFlowImpl,再看看CancellableFlowImpl:

    private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            flow.collect {
                currentCoroutineContext().ensureActive()
                collector.emit(it)
            }
        }
    }
    

    可以看到最后得出的也是CancellableFlow。

    3. flowOn flowOn用于切换flow上游的执行线程
    flowOn的取值共有四种:Dispatchers.IO、Dispatchers.Unconfined、Dispatchers.Main、Dispatchers.Default
    • Dispatchers.IO IO调度器,主要用于阻塞耗时操作。
    • Dispatchers.Unconfined 此调度器不改变线程,启动时在启动线程执行,恢复时在恢复线程执行。
    • Dispatchers.Main 主调度器 设置之后上游会运行在主线程。
    • Dispatchers.Default 默认调度器 不设置情况下采用此调度器,默认为cpu密集计算调度。
    • 这里我们看下Main和Default的情况多下对比
    代码如下:
                flow {
                    Log.d(TAG.TAG,"Default-emit current is ${Thread.currentThread().name}")
                    emit(10)
                }.flowOn(Dispatchers.Default).collect {
                    Log.d(TAG.TAG,"Default-collect current is ${Thread.currentThread().name}")
                }
    
                flow {
                    Log.d(TAG.TAG,"Main-emit current is ${Thread.currentThread().name}")
                    emit(10)
                }.flowOn(Dispatchers.Main).collect {
                    Log.d(TAG.TAG,"Main-collect current is ${Thread.currentThread().name}")
                }
    
    日志如下:
    2022-08-02 11:44:26.978 7102-7128/edu.test.demo D/Test-TAG: Default-emit current is DefaultDispatcher-worker-1
    2022-08-02 11:44:26.985 7102-7128/edu.test.demo D/Test-TAG: Default-collect current is DefaultDispatcher-worker-1
    2022-08-02 11:44:27.115 7102-7102/edu.test.demo D/Test-TAG: Main-emit current is main
    2022-08-02 11:44:27.118 7102-7128/edu.test.demo D/Test-TAG: Main-collect current is DefaultDispatcher-worker-1
    
    分析:
    • 可以看出这个在设置了Main之后emit执行在main线程。
    • 其余的详细对比请大家参考Dispatchers进行关注。
    4. buffer、conflate 这两个操作符主要处理背压的问题,本篇这里不做展开,在背压的部分会展开说明。

    其他操作符

    1. produceIn、receiveAsFlow、consumeAsFlow flow和channel之间转换相关,后续进行说明。
    2. asSharedFlow、asStateFlow、shareIn、stateIn SharedFlow和StateFlow相关,后续进行说明。

    总结

    • 本篇主要介绍了一些功能性操作符,如retry等。
    • buffer、conflate 这两个操作符主要和背压部分相关,在背压的部分作以展开说明。
    • 其他操作符,主要做了下分类说明,因为牵扯到其他大块的内容,在其他的部分作以补充。

    相关文章

      网友评论

          本文标题:【Koltin Flow(三)】Flow操作符之中间操作符(三)

          本文链接:https://www.haomeiwen.com/subject/vrtewrtx.html