Kotlin Flow数据流各种操作符演练

作者: 奔跑吧李博 | 来源:发表于2023-11-19 16:54 被阅读0次
    Flow 源码如下
    public interface Flow<out T> {
        public suspend fun collect(collector: FlowCollector<T>)
    }
    
    创建常规 Flow 的常用方式

    flow{...}

        private fun createFlow(): Flow<Int> = flow {
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
        }
    

    flowOf()

        //flowOf() 构建器定义了一个发射固定值集的流, 使用 flowOf 构建 Flow 不需要显示调用 emit() 发射数据
        //因为在扩展函数内部使用了emit
        private fun createFlow2(): Flow<Int> = flowOf(1, 2, 3)
    

    asFlow()

        //使用 asFlow() 扩展函数,可以将各种集合与序列转换为流,也不需要显示调用 emit() 发射数据
        //因为在扩展函数内部使用了emit
        private fun createFlow3(): Flow<Int> = listOf(1, 2, 3).asFlow()
    
    Flow 是冷流(惰性的)

    在调用末端流操作符(collect 是其中之一)之前, flow{ ... } 中的代码不会执行。我们称之为冷流。

        private fun createFlow(): Flow<Int> = flow {
            Log.i("minfo", "flow started")
            delay(1000)
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
        }
    
        fun code() = runBlocking {
            val flow = createFlow()
            Log.i("minfo", "flow collect")
            flow.collect {
                Log.i("minfo", it.toString())
            }
            Log.i("minfo", "flow collect again")
            flow.collect {
                Log.i("minfo", it.toString())
            }
        }
    

    打印结果:
    这里先执行了flow collect,在执行的flow started,先collect才会flow才会执行。

    flow collect
    flow started
    1
    2
    3
    flow collect again
    flow started
    1
    2
    3
    
    Flow 的取消

    流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。取消Flow 只需要取消它所在的协程即可。

        fun main() = runBlocking {
            withTimeoutOrNull(250) {
                simple().collect {
                    Log.i("minfo", it.toString())
                }
            }
            Log.i("minfo", "Done")
        }
    

    打印:

    1
    2
    Done
    
    末端流操作符
    collect

    收集上游发送的数据

    reduce

    reduce 类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。

        fun reduce() = runBlocking {
            val sum = (1..5).asFlow().reduce { a, b ->
                a + b
            }
            Log.i("minfo", sum.toString())
        }
    

    结果:

    15
    
    launchIn

    launchIn 用来在指定的 CoroutineScope 内启动 flow, 需要传入一个参数: CoroutineScope。

    我们希望并行执行两个 Flow,为每个 Flow 单独起一个协程:

        fun launchIn() = runBlocking {
            (1..5).asFlow()
                .onEach { delay(100) }
                .flowOn(Dispatchers.IO)
                .onEach { Log.i("minfo", it.toString()) }
                .launchIn(this)
    
            flowOf("one", "two", "three", "four", "five")
                .onEach { delay(200) }
                .flowOn(Dispatchers.IO)
                .onEach { Log.i("minfo", it.toString()) }
                .launchIn(this)
        }
    

    打印结果:

    1
    one
    2
    3
    two
    4
    5
    three
    four
    five
    
    流是连续的

    Flow 的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流。 默认情况下不启动新协程。 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。

    示例:

        fun influence() = runBlocking {
            (1..5).asFlow()
                .filter {
                    Log.i("minfo", "filter $it")
                    it % 2 == 0
                }
                .map {
                    Log.i("minfo", "map $it")
                    "String $it"
                }
                .collect {
                    Log.i("minfo", "collect $it")
                }
        }
    

    打印结果:

    filter 1
    filter 2
    map 2
    collect String 2
    filter 3
    filter 4
    map 4
    collect String 4
    filter 5
    
    onStart 流启动时/onCompletion 流完成时
        fun onStartCompletion() = runBlocking {
            (1..5).asFlow()
                .onEach { delay(200) }
                .onStart {
                    Log.i("minfo", "onStart")
                }
                .onCompletion {
                    Log.i("minfo", "onCompletion")
                }
                .collect {
                    Log.i("minfo", "$it")
                }
        }
    

    打印结果:

    onStart
    1
    2
    3
    4
    5
    onCompletion
    
    flowOn 切换线程

    Flow 是基于 CoroutineContext 进行线程切换的。因为 Collect 是一个 suspend 函数,必须在 CoroutineScope 中执行,所以响应线程是由 CoroutineContext 决定的。比如,在 Main 线程总执行 collect, 那么响应线程就是 Dispatchers.Main。

    Flows 通过 flowOn 方法来切换线程。

        fun dispatcher() = runBlocking {
            val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    
            (1..5).asFlow()
                .onEach {
                    //生产数据
                    Log.i("minfo", "${Thread.currentThread().name} + product: $it")
                }.flowOn(Dispatchers.IO)
                .map {
                    //转换数据
                    Log.i("minfo", "${Thread.currentThread().name} + $it to String")
                    "String: $it"
                }.flowOn(mDispatcher)
                .onCompletion {
                    mDispatcher.close()
                }
                .collect {
                    //消费数据
                    Log.i("minfo", "${Thread.currentThread().name} + collect: + $it")
                }
        }
    

    打印结果:

    DefaultDispatcher-worker-1 + product: 1
    DefaultDispatcher-worker-1 + product: 2
    DefaultDispatcher-worker-1 + product: 3
    DefaultDispatcher-worker-1 + product: 4
    DefaultDispatcher-worker-1 + product: 5
    pool-2-thread-1 + 1 to String
    main + collect: + String: 1
    pool-2-thread-1 + 2 to String
    main + collect: + String: 2
    pool-2-thread-1 + 3 to String
    main + collect: + String: 3
    pool-2-thread-1 + 4 to String
    pool-2-thread-1 + 5 to String
    main + collect: + String: 4
    main + collect: + String: 5
    

    可以看到,发射数据是在 Dispatchers.IO 线程执行的, map 操作时在我们自定义的线程池中进行的,collect 操作在 Dispatchers.Main 线程进行。

    Flow 中间转换操作符

    map

    map 操作符用于 Flow 表示将流中的每个元素进行转换后再发射出来。

        fun map() = runBlocking {
            (1..5).asFlow().map { "string $it" }
                .collect {
                    Log.i("minfo", it)
                }
        }
    
    transform

    在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别:

        fun transform() = runBlocking {
            (1..5).asFlow().transform {
                emit(it * 2)
                delay(1000)
                emit("String : $it")
            }.collect {
                Log.i("minfo", "" + it)
            }
        }
    

    打印:

    2
    String : 1
    4
    String : 2
    6
    String : 3
    8
    String : 4
    10
    String : 5
    
    onEach

    遍历自行处理

        fun onEach() = runBlocking {
            (1..5).asFlow()
                .onEach {
                    Log.i("minfo", "onEach $it")
                }.collect {
                    Log.i("minfo", "$it")
                }
        }
    
    filter

    按条件过滤

        fun filter() = runBlocking {
            (1..5).asFlow()
                .filter { it % 2 == 0 }
                .collect {
                    Log.i("minfo", "$it")
                }
        }
    
    take

    take 操作符只取前几个 emit 发射的值

        fun take() = runBlocking {
            (1..5).asFlow()
                .take(2).collect {
                    Log.i("minfo", "$it")
                }
        }
    
    zip

    zip 是可以将2个 flow 进行合并的操作符

        fun zip() = runBlocking {
            val flowA = (1..5).asFlow()
            val flowB = flowOf("one" , "two" , "three" , "four", "five")
            flowA.zip(flowB) { a, b ->
                "$a -- $b"
            }.collect {
                Log.i("minfo", "$it")
            }
        }
    

    打印结果:

    1 -- one
    2 -- two
    3 -- three
    4 -- four
    5 -- five
    
    flattenConcat 扁平化

    flattenConcat 将给定流按顺序展平为单个流,而不交错嵌套流。

        fun flattenConcat() = runBlocking {
            val flowA = (1..5).asFlow().onEach { delay(1000) }
            val flowB = flowOf("one", "two", "three","four","five").onEach { delay(1000) }
            flowOf(flowA, flowB).flattenConcat()
                .collect {
                    Log.i("minfo", "$it")
                }
        }
    

    打印结果:

     1
     2
     3
     4
     5
     one
     two
     three
     four
     five
    
    flatMapLatest

    当发射了新值之后,上个 flow 就会被取消。

        fun flatMapLatest() = runBlocking {
            (1..5).asFlow().onEach { delay(100) }
                .flatMapLatest {
                    flow {
                        Log.i("minfo", "begin flatMapLatest $it")
                        delay(200)
                        emit("String $it")
                        Log.i("minfo", "end flatMapLatest $it")
                    }
                }.collect {
                    Log.i("minfo", "$it")
                }
        }
    

    打印结果:

    begin flatMapLatest 1
    begin flatMapLatest 2
    begin flatMapLatest 3
    begin flatMapLatest 4
    begin flatMapLatest 5
    end flatMapLatest 5
    String 5
    

    参考:
    https://www.cnblogs.com/joy99/p/15805955.html#13-%E5%88%9B%E5%BB%BA%E5%B8%B8%E8%A7%84-flow-%E7%9A%84%E5%B8%B8%E7%94%A8%E6%96%B9%E5%BC%8F

    demo地址:

    https://github.com/running-libo/FlowUse

    相关文章

      网友评论

        本文标题:Kotlin Flow数据流各种操作符演练

        本文链接:https://www.haomeiwen.com/subject/ahqmwdtx.html