自上而下,关注整体思路,文章不考虑更多细节
//flow示例
CoroutineScope(Dispatchers.Default).launch {
flow {
//这部分就是collector
(1..a).forEach{
delay(1000)
emit(it + 1)
}
}.collect {
Log.e("flow collect"," $it")
}
}
拆解
class FlowCollectorImpl<Int>: FlowCollector<Int> {
suspend fun block(){
(1..a).forEach {
// 每隔 1 秒发射 1 个数字
delay(1000)
emit(it + 1)
}
}
override suspend fun emit(value: Int) {
Log.e("flow collect"," $it")// 打印数字
}
}
//源码中的内容示意
private class Flow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block() //引擎点火!
}
}
如果之前不太明白这点的通过上面拆解代码应该可以恍然大悟
部分源码
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
public fun <T> flow(
@BuilderInference block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : kotlinx.coroutines.flow.Flow<T>,
CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
//相当于调用上面拆解的block(),block()中调用了emit(),开始发射并收集数据
}
}
flow{}后面的lambda其实相当于就是FlowCollector的一个扩展函数,
collect{ }后面的lambda就是emit方法的实现。
调用collect之后,会调用collector.block(),而block()调用了emit(),从而开始打印数据。
Flow的filter、map等方法原理和关键源码:
核心原理:过滤本质是依次收集原flow数据后按设定条件过滤或转换后生成新的flow对象,发射新的转换后的数据!
filter和map方法都是使用transform()实现,而transform()其实就是接收了上游数据后,再生成新的flow发给下游。
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
上面的源码中,会生成一个新的Flow对象,而该Flow对象的数据源(收集者)就是从原Flow数据中collect的数据。然后经过转换,再次发射出去,而这个新Flow就是下游即将接收的经过筛选或转换后的新数据流。
看上面源码中的其中部分
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
这部分就是收集原flow中的数据,然后按转换条件去发射新的数据。(transform方法中会执行emit(value) )。
冷流的设计核心部分就是依靠函数参数
这一特性来实现的(函数参数大多数情况为lambda表达式),函数参数是kotlin最具创造性和颠覆性的特性之一,也是源码中很多复杂方法看起来难以理解的原因之一。函数参数就是一个死参数,开发者可以直接执行它(),或者不去执行而是继续向下传递该函数参数,如果你不主动去调用它(),它是不会执行它方法体内的代码的。比如上面案例源码:
flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
生成新的flow时,虽然其内部调用了collect{},但是其并没有真正执行(它重写了emit()),只是作为一个参数继续在flow构造内部向下传递该参数,最后在
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
上面中的collectSafely()中才真正被执行,上面是使用函数参数(FlowCollector的扩展函数,这个扩展函数就是生成flow{}时其内的lambda表达式)重写了collectSafely()!而collectSafely()是在collect()中执行的,也就是开始收集时才执行的,这就形成了闭环。这也是冷流的‘冷’原因所在,前期的工作是在“造车”,最后的执行才“点火启动”。
这一特性,使得kotlin拥有了无与伦比的功能构造能力。方法可以先通过传递函数参数来构造功能的整体架构,构造完成之后最后去执行来实现整个功能的运转。
如有问题,欢迎各位指正,感谢
网友评论