美文网首页
协程Flow

协程Flow

作者: 旺仔_100 | 来源:发表于2022-05-13 17:49 被阅读0次
一、介绍

Flow,在Kotlin协程当中是自成体系的知识点。简单的异步场景我们可以直接使用挂起函数、launch、async;复制的异步场景,我们可以使用Flow。Flow已经开始占领RxJava的领地,Flow还要取代LiveData了。Flow是很香的呀!

Flow就是"数据流"

我们首先创建数据,然后对数据做各种处理,最后结束数据流,拿到想要的结果。它跟Channel的区别是,Channel只能是发送数据和接受数据。而Flow是可以做中间的数据处理的。

fun main() = runBlocking {
    flow {            //上游,发源地
        emit(1) //挂起函数 发送数据
        emit(3)
        emit(5)
        emit(7)
        emit(9)
    }.filter { it > 3 }    //中间对数据处理
        .take(2)     //中间对数据处理
        .collect{          //下游  结束数据处理,拿到结果
            println(it)
        }
}

5
7

简单分析一下:

  • flow{},是一个高阶函数,创建了一个Flow。在它的lambda当中,我们可以使用emit发送一个数据。相当于创建一个数据流,并发送出去。
  • filter{} 、map{}、take(2),他们是对数据的中间处理,Flow的最大优势,就是他的操作符和集合的完全一样,上手成本低。
  • collect{},也被称为终止操作符或者末端操作符,他用来结束数据流,并接收这些数据。
采用flow和list的使用几乎一样
///使用其他的方式创建flow
fun main() = runBlocking {
    flowOf(2,4,6,8,10).filter { it > 4 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }


    listOf(2,4,6,8,10).filter {i -> i > 4  }
        .map { it * 2 }.take(2).forEach{
        println(it)
    }
}

第三种创建flow,可以通过list来转换成flow,当然也可以通过flow转换成list

//flow转list,list转flow
fun main() = runBlocking {
    //Flow 转list
    flowOf(1,2,3,4,5).toList()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach {
            println(it)
        }

    listOf(1,2,3,4,5).asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }
}
中间操作符

中间操作符(Intermedite Operators),除了之前提到的map、filter这种从集合那边抄过来,还有一些特殊的操作符,他们是专门为Flow设计的。

Flow生命周期

在flow的中间操作符当中,onStart、onCompletion这两个是比较特殊的。他们是以操作符的形式存在,但实际上的作用,是监听生命周期回调。

onStart ,它的作用是注册一个监听事件:当flow启动以后,它就被回调。具体我们可以看下面这个例子:

fun main()= runBlocking {
    flowOf(1,2,3,4,5)
        .filter {
            println("filter $it")
            it > 2
        }
        .map {
            println("map : $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .onCompletion { println("onCompletion") }
        .collect{
            println("collect:$it")
        }
}

onStart和onComplete跟位置没有关系,但是filter,map这些操作符是和位置有关系的。onComplete在下面三种情况都会进行回调:
1.Flow正常执行完毕
2.Flow当中出现异常
3.Flow被取消

//协程cancel 和协程异常
fun main() = runBlocking {
    launch {
        flow {
            emit(1)
            emit(2)
            emit(3)
        }.onCompletion { print("onComplete first : $it") }
            .collect{
                println("collect : $it")
                if (it == 2){
                    cancel()
                    println("cancel")
                }
            }
    }

    delay(100L)
    flowOf(4,5,6)
        .onCompletion { println("onCompletion second : $it") }
        .collect{
            println("collect:$it")
            throw IllegalStateException()
        }
}
catch异常的处理

我们可以使用两种方式来处理:

  • catch 关键字 ,但是这个中方式,只能对上游的代码进行捕获和处理
  • try catch 关键字 ,我们把collect中的代码全部用try catch来包裹就可以获取下游的异常。

如果觉得还要分两次抓,其实可以直接使用try catch把整个协程包起来

//异常处理
fun  main() = runBlocking {
//    try {
        flowOf(1,2,3,4)
            .filter { it / 0  == 1 }
        .catch {
            println("我抓到你了,但是我不处理")
        }
            .collect{
//                try {
//                    throw IllegalStateException()
                    println("collect $it")
//                }catch ( e : Exception){
//                    println("我抓到异常了")
//                }
            }
//    }catch (e : Exception ){
//        println("我抓到异常了吧")
//    }

}
切换Context:flowOn、launchIn

对于异步任务,我们经常需要频繁的切换线程,我们可以通过FlowOn来灵活实现。

fun  main() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
    flow
        .flowOn(Dispatchers.IO)
        .filter {
        logX("Filter:$it")
        it > 2
    }
        .flowOn(Dispatchers.Default)

        .collect{
            logX("Collect $it")
        }

}

FlowOn只能制定他上游的代码执行的线程。RxJava你还要搞清楚,观察者与被观察者,采用不同的方法切换线程。这个就是FlowOn去制定,是不是很方便!

当然collect里面的代码切换线程需要使用withContext{}.


fun  main() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
    flow
        .flowOn(Dispatchers.IO)
        .filter {
        logX("Filter:$it")
        it > 2
    }
        .flowOn(Dispatchers.Default)

        .collect{
            withContext(Dispatchers.IO){
                logX("Collect $it")
            }
        }

}

也可以使用withContext,但是并不推荐,可能出现问题,还有就是写法也不太好。

fun main() = runBlocking {
        val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }
   withContext(Dispatchers.IO){
        flow.flowOn(Dispatchers.IO)
            .filter {
                logX("filter $it")
                it > 2
            }
            .collect{
                logX("logX : $it")
            }
   }
}

可以使用launchIn,更加优雅,但是launchIn里面调用了collect,它会结束整个数据流。


fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.IO)
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit : 1")
        emit(2)
        logX("Emit : 2")
        emit(3)
        logX("Emit : 3")
    }

    flow.flowOn(Dispatchers.Default)
        .filter {
            logX("filter $it")
            it > 2
        }

        .onEach {
            logX("onEach $it")
        }
//        .launchIn(scope)

       scope.launch {
           flow.collect{
               logX("collect $it")
           }
       }

    delay(1000)

}

总结一下:我们在Flow中线程切换使用flowOn、launchIn、withContext,实际中使用flowOn、launchIn就可以满足需求了。

下游:终止操作符

它代表着数据流动的终止,无法在进行数据的处理。我们前面用到的collect就是一个终止操作符。还有first()、single()、fold{}、reduce{}等。

为什么说Flow是"冷"的?

//看下冷流和热流的区别
fun main() = runBlocking {
    //冷流  下面的代码不会执行
    val  flow = flow {
        (1..3).forEach {
            println("before sent $it")
            emit(it)
            println("Send $it")
        }

    }

    //热流
    val channel = produce<Int>(capacity = 0) {
        (1..3).forEach {
            println("channel Before sent $it")
            send(it)
            println("channel sent $it")
        }
    }
    println("end")
}

end
channel Before sent 1

flow里面的代码并没有走,channel里面的代码是有走的。所有channel被称为热流,是因为不过有没有接收方,发送方都会工作。
flow被称为冷流,是因为只有调用终止操作符之后,Flow才会开始工作。

fun main()= runBlocking {
    flow {
        println("emit : 3")
        emit(3)
        println("emit : 4")
        emit(4)
        println("emit : 5")
        emit(5)
    }.filter {
        println("filter: $it")
        it > 2
    }.map {
        println("map:$it")
        it * 2
    }.collect{
        println("collect : $it")
    }
}

emit : 3
filter: 3
map:3
collect : 6
emit : 4
filter: 4
map:4
collect : 8
emit : 5
filter: 5
map:5
collect : 10

从上面的输出结果可以看出,flow是一条一条的处理数据的。而不是批量做同一个操作。所以这可以看出,flow “懒”的特性。

使用flow来做网络请求

///模拟下网络请求,使用flow来写  这里报错,Dispatchers.Main必须在android项目里面才有
//fun main() =  runBlocking{
//        fun  laodData() = flow {
//            repeat(3){
//                delay(100L)
//                emit(it)
//                logX("emit :$it")
//            }
//        }
//
//        fun updateUI(it: Int){  logX("updateUI $it")
//        }
//        fun showLoading(){
//            println("showLoading")}
//        fun hideLoading(){
//            println("hideLoading")
//        }
//        val uiScope = CoroutineScope(Dispatchers.Main)
//
//        laodData().onStart { showLoading() }
//            .map { it * 2 }
//            .flowOn(Dispatchers.IO)
//            .catch { cause: Throwable -> println(cause)
//                hideLoading()
//                emit(-1)}
//            .onEach { updateUI(it) }
//            .onCompletion { hideLoading() }
//            .launchIn(uiScope)
//
//        delay(10000L)
//
//    }


I/System.out: ============================================================
I/System.out: emit :0
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 0
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 2
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :1
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :2
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 4
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: hideLoading
总结
  • Flow就是数据流,可以分为上游,中游和下游终止操作符。
  • 对于上游来说,主要是创建flow和产生数据,主要有三个操作符:flow、flowof、asFlow。
  • 中游操作符,主要有仿集合的api,生命周期onStart,onComplete,线程切换flowOn,launchIn,withContext,捕获异常catch等。
  • 下游操作符,collect,仿集合api,old、reduce,flow.toList等。

相关文章

  • Kotlin Flow啊,你将流向何方?

    前言 前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的...

  • KSP - 元编程编译提速的小助手

    前言 前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的...

  • 协程Flow

    一、介绍 Flow,在Kotlin协程当中是自成体系的知识点。简单的异步场景我们可以直接使用挂起函数、launch...

  • kotlin协程,Flow,DataStore学习总结

    有人会问协程和Flow可以替换RxJava答案肯定是:可以的 这里总结了下kotlin协程以及Flow的学习记录,...

  • Kotlin协程:Flow基础原理

    本文分析示例代码如下: 一.Flow的创建 在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代...

  • 协程Flow简单使用

    前言 本文是阅读协程Flow的总结笔记。 什么是Flow Kotlin中的Flow API是可以更好的异步处理按顺...

  • 协程Flow之FlowCallAdapterFactory

    Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit...

  • Kotlin学习路线

    1:kotlin的教程,主要学习协程 和 flow 参考链接:https://www.jianshu.com/p/...

  • kotlin flow (二)

    Flow操作符 buffer(int) 该操作符会新起一个协程来收集buffer之前的代码运行结果,新协程通过ch...

  • Kotlin中 Flow、SharedFlow与StateFlo

    一、简介 了解过协程Flow[https://link.juejin.cn?target=https%3A%2F%...

网友评论

      本文标题:协程Flow

      本文链接:https://www.haomeiwen.com/subject/tipnyrtx.html