一个案例让你秒懂kotlin flow原理

作者: 唠嗑008 | 来源:发表于2023-06-06 17:41 被阅读0次

    前言

    都2023年了,Kotlin协程和Flow大家都用的很熟了吧。Kotlin Flow 的主要作用是简化异步数据处理的开发,提高代码的可读性和可维护性,并提高应用程序的性能和效率。

    看一个demo,然后思考2个问题

    runBlocking {
            flow {
                println("wait exe emit 1")
                emit(1)
                println("wait exe emit 2")
                emit(2)
            }.collect {
                println("collect value : $it")
            }
        }
    

    执行结果

    wait exe emit 1
    collect value : 1
    wait exe emit 2
    collect value : 2
    

    从打印结果可以看出,flow代码块中的emit方法和collect代码块中的print是交替执行的。另外,如果不调用collect方法,是没有任何打印的。

    再来看一个改造的demo

    runBlocking {
             flow {
                emit(1)
                emit(2)
            }.collect(object :FlowCollector<Int>{
                override suspend fun emit(value: Int) {
                    println(value)
                }
            })
        }
    

    demo2和demo1从执行结果来看是一样的,区别是把collect方法从lambda变成了接口实现的方式。

    接下来,我们就思考并解决如下问题

    • flow,collect的执行顺序为什么是这样的?
    • 为什么必须调用collect方法才能执行flow代码块?

    源码解析

    Flow对象的创建

    flow{ }的实现

    //Creates a _cold_ flow from the given suspendable [block].
    public fun <T> flow(@BuilderInference block: suspend 
    FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    

    可以看到flow()里面需要传入了FlowCollector的扩展方法,同时会返回一个Flow对象,Flow就是个接口。

    这个FlowCollector接口是不是很熟悉,上面demo2中collect参数就是它。

    public interface FlowCollector<in T> {
    
        /**
         * Collects the value emitted by the upstream.
         * This method is not thread-safe and should not be invoked concurrently.
         */
        public suspend fun emit(value: T)
    }
    

    这个接口的作用是:下游收集上游发出的值

    再来看下Flow接口

    public interface Flow<out T> {
        /**
         * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
         * This method should never be implemented or used directly.
         *
         * The only way to implement the `Flow` interface directly is to extend [AbstractFlow].
         * To collect it into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
         * should be used. Such limitation ensures that the context preservation property is not violated and prevents most
         * of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
         */
        @InternalCoroutinesApi
        public suspend fun collect(collector: FlowCollector<T>)
    }
    

    它只有一个collect方法,作用是:收集上游emit方法发射的数据

    大家先记住这2个接口,后面流程都是围绕这2个接口来完成的。

    上面flow方法源码提到了,会创建一个SafeFlow对象

    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
    //3.调用block
            collector.block()
        }
    }
    
    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    
        @InternalCoroutinesApi
    //1.调用collect方法
        public final override suspend fun collect(collector: FlowCollector<T>) {
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
    //2.调用抽象方法collectSafely
                collectSafely(safeCollector)
            } finally {
                safeCollector.releaseIntercepted()
            }
        }
    
    
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    

    这段代码就是我们理清楚文章开头提出的2个疑问的关键所在,下面分析下。

    demo中的flow{ }.collect{ }可以拆成2部分,先创建SafeFlow对象,它是继承了AbstractFlow这个抽象类,然后调用它的collect方法,关键流程就是上面注释中的3个步骤。

    可以看出最后是调用了block(),那这个block又是哪里来的呢?还记得flow{ }的实现吗?

    public fun <T> flow(@BuilderInference block: suspend 
    FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
    

    这个block就是flow方法的参数这个block啊,它是FlowCollector的一个扩展函数,这个block其实就是我们flow { }大括号中的代码块啊,这个就是一个lambda表达式的语法而已。

    所以,调用collect方法,最终是执行了flow { }中的代码块。现在能理解为什么必须要调用collect这个收集方法才能触发上游的数据发射了吧,这也解释了为什么flow{}是冷流,因为必须要有收集者才能触发上游的数据发射。

    flow { }代码块中每次调用emit方法,会执行到collect{ }中的代码块,实际上就是执行的FlowCollector这个接口的emit方法啊,只不过collect{ }用了lambda来简化而已。

    用一张图来总结


    image.png

    相关文章

      网友评论

        本文标题:一个案例让你秒懂kotlin flow原理

        本文链接:https://www.haomeiwen.com/subject/ousgedtx.html