美文网首页
kotlin-Flow

kotlin-Flow

作者: Method | 来源:发表于2021-11-08 16:01 被阅读0次

    [toc]

    Flow 是什么

    LiveData提供了响应式编程的基础,搭建了一套数据观察者的使用框架,但是,它相当于RxJava这类的异步框架来说,有点略显单薄了,这也是经常被人诟病的问题,因此,Flow这个小三就顺应而生了。

    Flow作为一套==异步数据流框架==,几乎可以约等于RxJava,但借助Kotlin语法糖和协程,以及Kotlin的DSL语法,可以让Flow的写法变得异常简洁,让你直面人性最善良的地方,一切的黑暗和丑陋,都被编译器消化了。而且,Flow作为LiveData的进化版本,可以很好的和JetPack结合起来,作为全家桶的一员,为统一架构添砖加瓦。

    冷流和热流

    一个异步数据流,通常包含三部分:

    上游
    操作符
    下游
    

    所谓冷流,即下游无消费行为时,上游不会产生数据,只有下游开始消费,上游才从开始产生数据。

    而所谓热流,即无论下游是否有消费行为,上游都会自己产生数据。

    Flow操作符

    Flow和RxJava一样,用各种操作符撑起了异步数据流框架的半边天。Flow默认为冷流,即下游有消费时,才执行生产操作。

    所以,操作符也被分为两类——中间操作符和末端操作符,中间操作符不会产生消费行为,返回依然为Flow,而末端操作符,会产生消费行为,即触发流的生产。

    Flow的创建

    • flow

    通过flow{}构造器,可以快速创建Flow,在flow中,可以使用emit来生产数据(或者emitAll生产批量数据),示例如下。

    flow {
        for (i in 0..3) {
            emit(i.toString())
        }
    }
    
    
    • flowOf

    与listOf类似,Flow可以通过flowOf来产生有限的已知数据。

    flowOf(1, 2, 3)
    
    • asFlow

    asFlow用于将List转换为Flow。

    listOf(1,2,3).asFlow()
    
    
    • emptyFlow

    如题,创建一个空流。

    末端操作符

    末端操作符在调用之后,创建Flow的代码才会执行,这点和Sequence非常类似。

    • collect

    collect是最常用的末端操作符,示例如下。

    末端操作符都是suspend函数,所以需要运行在协程作用域中。

    flow {
        for (i in 0..3) {
            emit(i)
        }
    }.collect {
        it.e()
    }
    
    • collectIndexed

    带下标的collect,下标是Flow中的emit顺序。

    flowOf(4, 5, 6).collectIndexed { index, value ->
            "$index  $value".e()
    }
    
    • collectLatest

    collectLatest用于在collect中取消未来得及处理的数据,只保留当前最新的生产数据。

    //短时间范围内 会覆盖前一个数据
    flow {
        emit(7)
        delay(50)
        emit(8)
    }.collectLatest {
        "collecting $it".e()
        delay(100)
        "collected $it".e()
    }
    

    打印结果

    E/FlowAcy: collecting 7
    E/FlowAcy: collecting 8
    E/FlowAcy: collected 8
    
    • toCollection、toSet、toList

    这些操作符用于将Flow转换为Collection、Set和List。

    • launchIn

    在指定的协程作用域中直接执行Flow。

    flow {
        for (i in 0..3) {
            Log.d("xys", "emit value---$i")
            emit(i.toString())
        }
    }.launchIn(MainScope())
    
    • last、lastOrNull、first、firstOrNull

    返回Flow的最后一个值(第一个值),区别是last为空的话,last会抛出异常,而lastOrNull可空。

    flow {
        for (i in 0..3) {
            emit(i.toString())
        }
    }.last()
    

    状态操作符

    状态操作符不做任何修改,只是在合适的节点返回状态。

    • onStart:在上游生产数据前调用
    • onCompletion:在流完成或者取消时调用
    • onEach:在上游每次emit前调用
    • onEmpty:流中未产生任何数据时调用
    • catch:对上游中的异常进行捕获
    • retry、retryWhen:在发生异常时进行重试,retryWhen中可以拿到异常和当前重试的次数
    flow {
        for (i in 0..3) {
            emit(i)
        }
        //emit(listOf<String>()[1])
    }.onStart {
        "onStart...".e()
    }.onEach {
        "onEach $it".e()
    }.onCompletion {
        "onCompletion...".e()
    }.onEmpty {
        "onEmpty".e()
    }.retryWhen { cause, attempt ->
        attempt.e()
        attempt < 3
    }.catch {
        "catch $it".e()
    }.collect {
        "collect $it".e()
    }
    

    正常情况下打印结果

    onStart...
    onEach 0
    collect 0
    onEach 1
    collect 1
    onEach 2
    collect 2
    onEach 3
    collect 3
    onCompletion...
    

    异常情况下打印结果

    onStart...
    onCompletion...
    0
    onStart...
    onCompletion...
    1
    onStart...
    onCompletion...
    2
    onStart...
    onCompletion...
    3
    catch java.lang.IndexOutOfBoundsException
    

    另外,onCompletion也可以监听异常,代码如下所示

    .onCompletion { exception ->
        Log.d("xys", "Result---$exception")
    }
    

    Transform操作符

    与RxJava一样,在数据流中,我们可以利用操作符对数据进行各种变换,以满足操作流的不同需求。

    • map、mapLatest、mapNotNull

    map操作符将Flow的输入通过block转换为新的输出。

    flow {
        for (i in 0..3) {
            emit(i)
        }
    }.map {
        it * it
    }
    
    • transform

    transform操作符与map操作符有点一样,但又不完全一样,map是一对一的变换,而transform则可以完全控制流的数据,进行过滤、 重组等等操作都可以。

    flow {
        for (i in 0..3) {
            emit(i)
        }
    }.transform{
        "transform $it".e()
        if (it == 1){
            emit("hello")
        }
    }.collect {
        "collect $it".e()
    }
    

    打印结果

    transform 0
    transform 1
    collect hello
    transform 2
    transform 3
    
    • transformWhile

    transformWhile的返回值是一个bool类型,用来控制流的截断,如果返回true,则流继续执行,如果false,则流截断。

    flow {
        for (i in 0..3) {
            emit(i)
            "emit $i".e()
        }
    }.transformWhile{value ->
        emit(value)
        "transformWhile $value".e()
        true
    }.collect {
        "Result---$it".e()
    }
    

    过滤操作符

    • filter、filterInstance、filterNot、filterNotNull

    过滤操作符可以按条件、类型或者对过滤取反、取非空等条件进行操作。

    flowOf(1,2,3)
    .filter {
        it > 1
    }.collect {
        it.e()
    }
    
    • drop、dropWhile、take、takeWhile

    这类操作符可以丢弃前n个数据,或者是只拿前n个数据。带while后缀的,则表示按条件进行判断。

    flowOf(1,3,4,5,2)
     .drop(2)
     .collect {
         it.e()
     }
    
    • debounce

    debounce操作符用于防抖,指定时间内的值只接收最新的一个。

    flow {
        emit(1)
        delay(90)
        emit(2)
        delay(90)
        emit(3)
        delay(1010)
        emit(4)
        delay(1010)
        emit(5)
    }.debounce(1000).collect {
        it.e()
    }
    

    打印结果

    3
    4
    5
    
    • sample

    sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

    • distinctUntilChangedBy

    去重操作符,可以按照指定类型的参数进行去重。

    组合操作符

    组合操作符用于将多个Flow的数据进行组合。

    • combine、combineTransform

    combine操作符可以连接两个不同的Flow。

    val flow1 = flowOf(1, 2).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
    flow1.combine(flow2) { i, s -> i.toString() + s }.collect {
        Log.d("xys", "Flow combine: $it")
    }
    

    打印结果

    D/xys: Flow combine: 1a
    D/xys: Flow combine: 2a
    D/xys: Flow combine: 2b
    D/xys: Flow combine: 2c
    

    可以发现,当两个Flow数量不同时,始终由Flow1开始,用其最新的元素,与Flow2的最新的元素进行组合,形成新的元素。

    • merge

    merge操作符用于将多个流合并。

    val flow1 = flowOf(1, 2).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
    listOf(flow1, flow2).merge().collect {
        Log.d("xys", "Flow merge: $it")
    }
    
    D/xys: Flow merge: 1
    D/xys: Flow merge: 2
    D/xys: Flow merge: a
    D/xys: Flow merge: b
    D/xys: Flow merge: c
    

    merge的输出结果是按照时间顺序,将多个流依次发射出来。

    • zip

    zip操作符会分别从两个流中取值,当一个流中的数据取完,zip过程就完成了。

    val flow1 = flowOf(1, 2).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
    flow1.zip(flow2) { i, s -> i.toString() + s }.collect {
        Log.d("xys", "Flow zip: $it")
    }
    
    D/xys: Flow zip: 1a
    D/xys: Flow zip: 2b
    

    线程切换

    • flowOn

    在Flow中,可以简单的使用flowOn来指定线程的切换,flowOn会对上游,以及flowOn之前的所有操作符生效。

    flow {
        for (i in 0..3) {
            Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
            emit(i)
        }
    }.map {
        Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
        it * it
    }.flowOn(Dispatchers.IO).collect {
        Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
        Log.d("xys", "Result---$it")
    }
    

    这种情况下,flow和map的操作都将在子线程中执行。

    而如果是这样:

    flow {
        for (i in 0..3) {
            Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
            emit(i)
        }
    }.flowOn(Dispatchers.IO).map {
        Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
        it * it
    }.collect {
        Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
        Log.d("xys", "Result---$it")
    }
    

    这样map就会执行在主线程了。

    同时,你也可以多次调用flowOn来不断的切换线程,让前面的操作符执行在不同的线程中。

    取消Flow

    Flow也是可以被取消的,最常用的方式就是通过withTimeoutOrNull来取消,代码如下所示。

    MainScope().launch {
        withTimeoutOrNull(2500) {
            flow {
                for (i in 1..5) {
                    delay(1000)
                    emit(i)
                }
            }.collect {
                Log.d("xys", "Flow: $it")
            }
        }
    }
    

    这样当输出1、2之后,Flow就被取消了。

    Flow的取消,实际上就是依赖于协程的取消。

    文章转载至

    作者:xuyisheng
    链接:https://juejin.cn/post/7020977260572180511

    相关文章

      网友评论

          本文标题:kotlin-Flow

          本文链接:https://www.haomeiwen.com/subject/vbnuzltx.html