目录
【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow
前言
- 本篇主要介绍中间操作符的功能性操作符以及一些其他操作符。
- 如果对其他的操作符或者flow基本知识不太了解,可参考目录的其他篇内容作为参考。
功能操作符
1. retry、retryWhen retry为retryWhen的简化版本,可设置重试次数,以及在闭包内重试开关。
retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。
代码如下:
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retry(2).catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retry(2) $it")
}
index = 0
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retry {
it is RuntimeException
}.catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retry{} $it")
}
index = 0
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retryWhen { cause, attempt ->
Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
cause is RuntimeException
} .catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retryWhen $it")
}
日志如下:
2022-08-02 10:26:55.301 4775-4801/edu.test.demo D/Test-TAG: retry(2) 100
2022-08-02 10:26:55.302 4775-4801/edu.test.demo D/Test-TAG: retry{} 100
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: retryWhen 100
分析:
- 可以看出虽然在一定条件会抛出异常,但是100这个值都提交成功了,这就是重试retry的作用。
- retry的次数和闭包返回值可以同时设置,两个值为并列关系,如果一个不满足则不会重试,次数的默认值为Int.MAX_VALUE,闭包的返回值默认为true,所以我们也可以不设置值,直接调用retry()也可以实现重试的效果。
- retryWhen和retry一致,闭包返回true则重试,返回false则不再重试。
2. cancellable 设置了之后则cancel()取消了flow值得发送,但是有个特殊情况,asFlow的时候需要这个cancellable设置,但是flow{}直接创建出来的flow则不需要设置,原因在下面的分析中看源码说明。
代码如下:
代码1
(1..10).asFlow().cancellable().catch {
Log.e(TAG.TAG,"ex is $it")
}.collect {
if (it == 5){
cancel()
}
Log.d(TAG.TAG, " (1..10).asFlow() cancellable $it")
}
代码2
flow {
repeat(10){
emit(it)
}
}.collect {
if (it == 5){
cancel()
}
Log.d(TAG.TAG, "flow{} cancellable $it")
}
日志如下:
日志1
2022-08-02 11:03:27.029 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 1
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 2
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 3
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 4
2022-08-02 11:03:27.035 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 5
日志2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 0
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 1
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 3
2022-08-02 11:10:51.502 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 4
2022-08-02 11:10:51.505 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 5
分析:
- 可以看出asFlow在设置了cancellable之后,可以正常取消,如果不设置则会打印出全部的值。
- flow{}则不受影响,不设置cancellable也可以正常取消。
- 原因分析如下:
- asFlow生成的是Flow<T>,调用的方法是unsafeFlow,看名字就可以看出一些端倪,unsafe,但是点开源码如下:
/**
* Creates a flow that produces values from the range.
*/
public fun IntRange.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
不是unsafeFlow呀,直接也是flow呀,这里注意导入:
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
看出来了吧,是unsafeFlow...,到这里asFlow就解释的差不多了。
- 在看下flow的源码:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow,是不是从名字也可以看出一些端倪,如果不清楚在继续往下看:
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>{
......
}
看到了吧,CancellableFlow,可取消的flow,再看
/**
* Internal marker for flows that are [cancellable].
*/
internal interface CancellableFlow<out T> : Flow<T>
是不是更清楚了,flow{},生成的本身就是CancellableFlow,到此处flow{},也解释的差不多了。
- 另外再看下cancellable的源码:
public fun <T> Flow<T>.cancellable(): Flow<T> =
when (this) {
is CancellableFlow<*> -> this // Fast-path, already cancellable
else -> CancellableFlowImpl(this)
}
两种情况,一种本身就是CancellableFlow,直接返回自己,另外一种,则创建CancellableFlowImpl,再看看CancellableFlowImpl:
private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
flow.collect {
currentCoroutineContext().ensureActive()
collector.emit(it)
}
}
}
可以看到最后得出的也是CancellableFlow。
3. flowOn flowOn用于切换flow上游的执行线程
flowOn的取值共有四种:Dispatchers.IO、Dispatchers.Unconfined、Dispatchers.Main、Dispatchers.Default
- Dispatchers.IO IO调度器,主要用于阻塞耗时操作。
- Dispatchers.Unconfined 此调度器不改变线程,启动时在启动线程执行,恢复时在恢复线程执行。
- Dispatchers.Main 主调度器 设置之后上游会运行在主线程。
- Dispatchers.Default 默认调度器 不设置情况下采用此调度器,默认为cpu密集计算调度。
- 这里我们看下Main和Default的情况多下对比
代码如下:
flow {
Log.d(TAG.TAG,"Default-emit current is ${Thread.currentThread().name}")
emit(10)
}.flowOn(Dispatchers.Default).collect {
Log.d(TAG.TAG,"Default-collect current is ${Thread.currentThread().name}")
}
flow {
Log.d(TAG.TAG,"Main-emit current is ${Thread.currentThread().name}")
emit(10)
}.flowOn(Dispatchers.Main).collect {
Log.d(TAG.TAG,"Main-collect current is ${Thread.currentThread().name}")
}
日志如下:
2022-08-02 11:44:26.978 7102-7128/edu.test.demo D/Test-TAG: Default-emit current is DefaultDispatcher-worker-1
2022-08-02 11:44:26.985 7102-7128/edu.test.demo D/Test-TAG: Default-collect current is DefaultDispatcher-worker-1
2022-08-02 11:44:27.115 7102-7102/edu.test.demo D/Test-TAG: Main-emit current is main
2022-08-02 11:44:27.118 7102-7128/edu.test.demo D/Test-TAG: Main-collect current is DefaultDispatcher-worker-1
分析:
- 可以看出这个在设置了Main之后emit执行在main线程。
- 其余的详细对比请大家参考Dispatchers进行关注。
4. buffer、conflate 这两个操作符主要处理背压的问题,本篇这里不做展开,在背压的部分会展开说明。
其他操作符
1. produceIn、receiveAsFlow、consumeAsFlow flow和channel之间转换相关,后续进行说明。
2. asSharedFlow、asStateFlow、shareIn、stateIn SharedFlow和StateFlow相关,后续进行说明。
总结
- 本篇主要介绍了一些功能性操作符,如retry等。
- buffer、conflate 这两个操作符主要和背压部分相关,在背压的部分作以展开说明。
- 其他操作符,主要做了下分类说明,因为牵扯到其他大块的内容,在其他的部分作以补充。
网友评论