一、介绍
Flow,在Kotlin协程当中是自成体系的知识点。简单的异步场景我们可以直接使用挂起函数、launch、async;复制的异步场景,我们可以使用Flow。Flow已经开始占领RxJava的领地,Flow还要取代LiveData了。Flow是很香的呀!
Flow就是"数据流"
我们首先创建数据,然后对数据做各种处理,最后结束数据流,拿到想要的结果。它跟Channel的区别是,Channel只能是发送数据和接受数据。而Flow是可以做中间的数据处理的。
fun main() = runBlocking {
flow { //上游,发源地
emit(1) //挂起函数 发送数据
emit(3)
emit(5)
emit(7)
emit(9)
}.filter { it > 3 } //中间对数据处理
.take(2) //中间对数据处理
.collect{ //下游 结束数据处理,拿到结果
println(it)
}
}
5
7
简单分析一下:
- flow{},是一个高阶函数,创建了一个Flow。在它的lambda当中,我们可以使用emit发送一个数据。相当于创建一个数据流,并发送出去。
- filter{} 、map{}、take(2),他们是对数据的中间处理,Flow的最大优势,就是他的操作符和集合的完全一样,上手成本低。
- collect{},也被称为终止操作符或者末端操作符,他用来结束数据流,并接收这些数据。
采用flow和list的使用几乎一样
///使用其他的方式创建flow
fun main() = runBlocking {
flowOf(2,4,6,8,10).filter { it > 4 }
.map { it * 2 }
.take(2)
.collect{
println(it)
}
listOf(2,4,6,8,10).filter {i -> i > 4 }
.map { it * 2 }.take(2).forEach{
println(it)
}
}
第三种创建flow,可以通过list来转换成flow,当然也可以通过flow转换成list
//flow转list,list转flow
fun main() = runBlocking {
//Flow 转list
flowOf(1,2,3,4,5).toList()
.filter { it > 2 }
.map { it * 2 }
.take(2)
.forEach {
println(it)
}
listOf(1,2,3,4,5).asFlow()
.filter { it > 2 }
.map { it * 2 }
.take(2)
.collect{
println(it)
}
}
中间操作符
中间操作符(Intermedite Operators),除了之前提到的map、filter这种从集合那边抄过来,还有一些特殊的操作符,他们是专门为Flow设计的。
Flow生命周期
在flow的中间操作符当中,onStart、onCompletion这两个是比较特殊的。他们是以操作符的形式存在,但实际上的作用,是监听生命周期回调。
onStart ,它的作用是注册一个监听事件:当flow启动以后,它就被回调。具体我们可以看下面这个例子:
fun main()= runBlocking {
flowOf(1,2,3,4,5)
.filter {
println("filter $it")
it > 2
}
.map {
println("map : $it")
it * 2
}
.take(2)
.onStart { println("onStart") }
.onCompletion { println("onCompletion") }
.collect{
println("collect:$it")
}
}
onStart和onComplete跟位置没有关系,但是filter,map这些操作符是和位置有关系的。onComplete在下面三种情况都会进行回调:
1.Flow正常执行完毕
2.Flow当中出现异常
3.Flow被取消
//协程cancel 和协程异常
fun main() = runBlocking {
launch {
flow {
emit(1)
emit(2)
emit(3)
}.onCompletion { print("onComplete first : $it") }
.collect{
println("collect : $it")
if (it == 2){
cancel()
println("cancel")
}
}
}
delay(100L)
flowOf(4,5,6)
.onCompletion { println("onCompletion second : $it") }
.collect{
println("collect:$it")
throw IllegalStateException()
}
}
catch异常的处理
我们可以使用两种方式来处理:
- catch 关键字 ,但是这个中方式,只能对上游的代码进行捕获和处理
- try catch 关键字 ,我们把collect中的代码全部用try catch来包裹就可以获取下游的异常。
如果觉得还要分两次抓,其实可以直接使用try catch把整个协程包起来
//异常处理
fun main() = runBlocking {
// try {
flowOf(1,2,3,4)
.filter { it / 0 == 1 }
.catch {
println("我抓到你了,但是我不处理")
}
.collect{
// try {
// throw IllegalStateException()
println("collect $it")
// }catch ( e : Exception){
// println("我抓到异常了")
// }
}
// }catch (e : Exception ){
// println("我抓到异常了吧")
// }
}
切换Context:flowOn、launchIn
对于异步任务,我们经常需要频繁的切换线程,我们可以通过FlowOn来灵活实现。
fun main() = runBlocking {
val flow = flow {
logX("Start")
emit(1)
logX("Emit : 1")
emit(2)
logX("Emit : 2")
emit(3)
logX("Emit : 3")
}
flow
.flowOn(Dispatchers.IO)
.filter {
logX("Filter:$it")
it > 2
}
.flowOn(Dispatchers.Default)
.collect{
logX("Collect $it")
}
}
FlowOn只能制定他上游的代码执行的线程。RxJava你还要搞清楚,观察者与被观察者,采用不同的方法切换线程。这个就是FlowOn去制定,是不是很方便!
当然collect里面的代码切换线程需要使用withContext{}.
fun main() = runBlocking {
val flow = flow {
logX("Start")
emit(1)
logX("Emit : 1")
emit(2)
logX("Emit : 2")
emit(3)
logX("Emit : 3")
}
flow
.flowOn(Dispatchers.IO)
.filter {
logX("Filter:$it")
it > 2
}
.flowOn(Dispatchers.Default)
.collect{
withContext(Dispatchers.IO){
logX("Collect $it")
}
}
}
也可以使用withContext,但是并不推荐,可能出现问题,还有就是写法也不太好。
fun main() = runBlocking {
val flow = flow {
logX("Start")
emit(1)
logX("Emit : 1")
emit(2)
logX("Emit : 2")
emit(3)
logX("Emit : 3")
}
withContext(Dispatchers.IO){
flow.flowOn(Dispatchers.IO)
.filter {
logX("filter $it")
it > 2
}
.collect{
logX("logX : $it")
}
}
}
可以使用launchIn,更加优雅,但是launchIn里面调用了collect,它会结束整个数据流。
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.IO)
val flow = flow {
logX("Start")
emit(1)
logX("Emit : 1")
emit(2)
logX("Emit : 2")
emit(3)
logX("Emit : 3")
}
flow.flowOn(Dispatchers.Default)
.filter {
logX("filter $it")
it > 2
}
.onEach {
logX("onEach $it")
}
// .launchIn(scope)
scope.launch {
flow.collect{
logX("collect $it")
}
}
delay(1000)
}
总结一下:我们在Flow中线程切换使用flowOn、launchIn、withContext,实际中使用flowOn、launchIn就可以满足需求了。
下游:终止操作符
它代表着数据流动的终止,无法在进行数据的处理。我们前面用到的collect就是一个终止操作符。还有first()、single()、fold{}、reduce{}等。
为什么说Flow是"冷"的?
//看下冷流和热流的区别
fun main() = runBlocking {
//冷流 下面的代码不会执行
val flow = flow {
(1..3).forEach {
println("before sent $it")
emit(it)
println("Send $it")
}
}
//热流
val channel = produce<Int>(capacity = 0) {
(1..3).forEach {
println("channel Before sent $it")
send(it)
println("channel sent $it")
}
}
println("end")
}
end
channel Before sent 1
flow里面的代码并没有走,channel里面的代码是有走的。所有channel被称为热流,是因为不过有没有接收方,发送方都会工作。
flow被称为冷流,是因为只有调用终止操作符之后,Flow才会开始工作。
fun main()= runBlocking {
flow {
println("emit : 3")
emit(3)
println("emit : 4")
emit(4)
println("emit : 5")
emit(5)
}.filter {
println("filter: $it")
it > 2
}.map {
println("map:$it")
it * 2
}.collect{
println("collect : $it")
}
}
emit : 3
filter: 3
map:3
collect : 6
emit : 4
filter: 4
map:4
collect : 8
emit : 5
filter: 5
map:5
collect : 10
从上面的输出结果可以看出,flow是一条一条的处理数据的。而不是批量做同一个操作。所以这可以看出,flow “懒”的特性。
使用flow来做网络请求
///模拟下网络请求,使用flow来写 这里报错,Dispatchers.Main必须在android项目里面才有
//fun main() = runBlocking{
// fun laodData() = flow {
// repeat(3){
// delay(100L)
// emit(it)
// logX("emit :$it")
// }
// }
//
// fun updateUI(it: Int){ logX("updateUI $it")
// }
// fun showLoading(){
// println("showLoading")}
// fun hideLoading(){
// println("hideLoading")
// }
// val uiScope = CoroutineScope(Dispatchers.Main)
//
// laodData().onStart { showLoading() }
// .map { it * 2 }
// .flowOn(Dispatchers.IO)
// .catch { cause: Throwable -> println(cause)
// hideLoading()
// emit(-1)}
// .onEach { updateUI(it) }
// .onCompletion { hideLoading() }
// .launchIn(uiScope)
//
// delay(10000L)
//
// }
I/System.out: ============================================================
I/System.out: emit :0
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 0
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 2
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :1
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: emit :2
I/System.out: Thread:DefaultDispatcher-worker-3
I/System.out: ============================================================
I/System.out: ============================================================
I/System.out: updateUI 4
I/System.out: Thread:main
I/System.out: ============================================================
I/System.out: hideLoading
总结
- Flow就是数据流,可以分为上游,中游和下游终止操作符。
- 对于上游来说,主要是创建flow和产生数据,主要有三个操作符:flow、flowof、asFlow。
- 中游操作符,主要有仿集合的api,生命周期onStart,onComplete,线程切换flowOn,launchIn,withContext,捕获异常catch等。
- 下游操作符,collect,仿集合api,old、reduce,flow.toList等。
网友评论