美文网首页
【Koltin Flow(三)】Flow操作符之中间操作符(三)

【Koltin Flow(三)】Flow操作符之中间操作符(三)

作者: MakerGaoGao | 来源:发表于2022-08-03 15:53 被阅读0次

目录

【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow

前言

  1. 本篇主要介绍中间操作符的功能性操作符以及一些其他操作符。
  2. 如果对其他的操作符或者flow基本知识不太了解,可参考目录的其他篇内容作为参考。

功能操作符

1. retry、retryWhen retry为retryWhen的简化版本,可设置重试次数,以及在闭包内重试开关。

retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。

代码如下:
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry(2).catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry(2)  $it")
            }
            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry {
                it is RuntimeException
            }.catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retry{}  $it")
            }


            index = 0
            flow<Int> {
                if (index < 2) {
                    index++
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retryWhen { cause, attempt ->
                Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
                cause is RuntimeException
            } .catch {
                Log.e(TAG.TAG, "ex is $it")
            }.collect {
                Log.d(TAG.TAG, "retryWhen  $it")
            }

日志如下:
2022-08-02 10:26:55.301 4775-4801/edu.test.demo D/Test-TAG: retry(2)  100
2022-08-02 10:26:55.302 4775-4801/edu.test.demo D/Test-TAG: retry{}  100
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: retryWhen  100
分析:
  • 可以看出虽然在一定条件会抛出异常,但是100这个值都提交成功了,这就是重试retry的作用。
  • retry的次数和闭包返回值可以同时设置,两个值为并列关系,如果一个不满足则不会重试,次数的默认值为Int.MAX_VALUE,闭包的返回值默认为true,所以我们也可以不设置值,直接调用retry()也可以实现重试的效果。
  • retryWhen和retry一致,闭包返回true则重试,返回false则不再重试。
2. cancellable 设置了之后则cancel()取消了flow值得发送,但是有个特殊情况,asFlow的时候需要这个cancellable设置,但是flow{}直接创建出来的flow则不需要设置,原因在下面的分析中看源码说明。
代码如下:

代码1

            (1..10).asFlow().cancellable().catch {
                Log.e(TAG.TAG,"ex is $it")
            }.collect {
                if (it == 5){
                    cancel()
                }
                Log.d(TAG.TAG, " (1..10).asFlow() cancellable $it")
            }

代码2

         flow {
                repeat(10){
                    emit(it)
                }
            }.collect {
                if (it == 5){
                    cancel()
                }
                Log.d(TAG.TAG, "flow{} cancellable $it")
            }
日志如下:

日志1

2022-08-02 11:03:27.029 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 1
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 2
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 3
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 4
2022-08-02 11:03:27.035 6421-6448/edu.test.demo D/Test-TAG:  (1..10).asFlow() cancellable 5

日志2

2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 0
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 1
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 3
2022-08-02 11:10:51.502 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 4
2022-08-02 11:10:51.505 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 5
分析:
  • 可以看出asFlow在设置了cancellable之后,可以正常取消,如果不设置则会打印出全部的值。
  • flow{}则不受影响,不设置cancellable也可以正常取消。
  • 原因分析如下:
  • asFlow生成的是Flow<T>,调用的方法是unsafeFlow,看名字就可以看出一些端倪,unsafe,但是点开源码如下:
/**
 * Creates a flow that produces values from the range.
 */
public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

不是unsafeFlow呀,直接也是flow呀,这里注意导入:

import kotlinx.coroutines.flow.internal.unsafeFlow as flow

看出来了吧,是unsafeFlow...,到这里asFlow就解释的差不多了。

  • 在看下flow的源码:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow,是不是从名字也可以看出一些端倪,如果不清楚在继续往下看:

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>{
    ......
}

看到了吧,CancellableFlow,可取消的flow,再看

/**
 * Internal marker for flows that are [cancellable].
 */
internal interface CancellableFlow<out T> : Flow<T>

是不是更清楚了,flow{},生成的本身就是CancellableFlow,到此处flow{},也解释的差不多了。

  • 另外再看下cancellable的源码:
public fun <T> Flow<T>.cancellable(): Flow<T> =
    when (this) {
        is CancellableFlow<*> -> this // Fast-path, already cancellable
        else -> CancellableFlowImpl(this)
    }

两种情况,一种本身就是CancellableFlow,直接返回自己,另外一种,则创建CancellableFlowImpl,再看看CancellableFlowImpl:

private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        flow.collect {
            currentCoroutineContext().ensureActive()
            collector.emit(it)
        }
    }
}

可以看到最后得出的也是CancellableFlow。

3. flowOn flowOn用于切换flow上游的执行线程
flowOn的取值共有四种:Dispatchers.IO、Dispatchers.Unconfined、Dispatchers.Main、Dispatchers.Default
  • Dispatchers.IO IO调度器,主要用于阻塞耗时操作。
  • Dispatchers.Unconfined 此调度器不改变线程,启动时在启动线程执行,恢复时在恢复线程执行。
  • Dispatchers.Main 主调度器 设置之后上游会运行在主线程。
  • Dispatchers.Default 默认调度器 不设置情况下采用此调度器,默认为cpu密集计算调度。
  • 这里我们看下Main和Default的情况多下对比
代码如下:
            flow {
                Log.d(TAG.TAG,"Default-emit current is ${Thread.currentThread().name}")
                emit(10)
            }.flowOn(Dispatchers.Default).collect {
                Log.d(TAG.TAG,"Default-collect current is ${Thread.currentThread().name}")
            }

            flow {
                Log.d(TAG.TAG,"Main-emit current is ${Thread.currentThread().name}")
                emit(10)
            }.flowOn(Dispatchers.Main).collect {
                Log.d(TAG.TAG,"Main-collect current is ${Thread.currentThread().name}")
            }
日志如下:
2022-08-02 11:44:26.978 7102-7128/edu.test.demo D/Test-TAG: Default-emit current is DefaultDispatcher-worker-1
2022-08-02 11:44:26.985 7102-7128/edu.test.demo D/Test-TAG: Default-collect current is DefaultDispatcher-worker-1
2022-08-02 11:44:27.115 7102-7102/edu.test.demo D/Test-TAG: Main-emit current is main
2022-08-02 11:44:27.118 7102-7128/edu.test.demo D/Test-TAG: Main-collect current is DefaultDispatcher-worker-1
分析:
  • 可以看出这个在设置了Main之后emit执行在main线程。
  • 其余的详细对比请大家参考Dispatchers进行关注。
4. buffer、conflate 这两个操作符主要处理背压的问题,本篇这里不做展开,在背压的部分会展开说明。

其他操作符

1. produceIn、receiveAsFlow、consumeAsFlow flow和channel之间转换相关,后续进行说明。
2. asSharedFlow、asStateFlow、shareIn、stateIn SharedFlow和StateFlow相关,后续进行说明。

总结

  • 本篇主要介绍了一些功能性操作符,如retry等。
  • buffer、conflate 这两个操作符主要和背压部分相关,在背压的部分作以展开说明。
  • 其他操作符,主要做了下分类说明,因为牵扯到其他大块的内容,在其他的部分作以补充。

相关文章

网友评论

      本文标题:【Koltin Flow(三)】Flow操作符之中间操作符(三)

      本文链接:https://www.haomeiwen.com/subject/vrtewrtx.html