1.什么是Flow ?
一种异步数据流,它按顺序发出值并正常或异常完成。
Flow 是一种 "冷流"(Cold Stream)。
"冷流" 是一种数据源,该类数据源的生产者会在每个监听者开始消费事件的时候执行,
从而在每个订阅上创建新的数据流。一旦消费者停止监听或者生产者的阻塞结束,数据流将会被自动关闭。
2.LiveData 和 Flow 的区别?
livedata: 是一种可感知生命周期的组件,持有可被观察的数据。
不会发生内存泄漏,保持最新的数据,(onStart时获取最新数据,相反也会丢失数据)。
flow :异步流 无生命周期感知,不会丢失数据,配合协程使用
3.简单介绍flow 使用
数据源 flow{}
每500ms emit一个数据
val flow = flow<Int> {
repeat(100) {
emit(it)
delay(500)
}
}
消费:
onEach :返回上一个流,并执行对应的 action
订阅 collect
collect :函数是一个 suspend 方法,所以它必须发生在协程或者带有 suspend 的方法里面
onCompletion : 返回上一个流,完成or取消执行action
catch:上部流 的异常捕获
flow.onEach {
println("onEach $it")
}.onCompletion {}
.catch { }
.collect {
println("collect $it")
}
launchIn:
除了 collect flow 还有 launchIn并发执行
launchIn 可以把flow 放在协程作用域中,
scope 作用域直接触发flow .跟随这scope的取消而取消
------------------------------------------
1. collect | launchIn 主要区别:
collect 是阻塞的 串行执行
launchIn 是非阻塞的 并行执行
/**
* 交替执行
输出:
it1 = 0
it2 = 0
it1 = 1
it2 = 1
it1 = 2
it2 = 2
it1 = 3
it2 = 3
*/
suspend fun launchIn() {
val scope = CoroutineScope(Job())
val job = scope.launch {
delay(2000)
}
val flow1 = flow {
List(100) {
emit("it1 = $it")
}
}.onEach {
delay(500)
println(it)
}
val flow2 = flow {
List(100) {
emit("it2 = $it")
}
}.onEach {
delay(500)
println(it)
}
flow1.launchIn(scope)
flow2.launchIn(scope)
job.cancelAndJoin()
delay(2500)
}
4.channel
协程间通信 (java:Handler)
一个面向多协程之间数据传输的 BlockQueue
receive() 只能获取一次
获取多次 遍历 channel
suspend fun channel() {
val channel = Channel<String>()
val job = CoroutineScope(SupervisorJob())
.launch {
launch {
repeat(100) {
channel.send("$it")
delay(100)
}
}
launch {
while (isActive) {
println("receive = ${channel.receive()}")
}
}
}
withTimeoutOrNull(11000) {
job.join()
}
}
4.flow 常用操作符
1. filter map take transform
/**
*
filter 过滤
map 转换操作符,将 A 变成 B
take 接收多少个 emit 出的值
transform 对给定流的每个值应用[转换]函数
输出:
collect 4
collect 5
*/
suspend fun operator2() {
flowOf(1, 2, 3, 4)
.transform {
emit(it +2)
}
.filter {
it > 1
}
.map {
it + 1
}.take(2)
.onEach {
println("collect $it")
}.collect()
}
single:
/**
* 发射单个值
* it = 0
* 多值会发生 :Flow has more than one element
*/
suspend fun single() {
val flow1 = flow {
List(2) {
emit("it = $it")
delay(100)
}
}
println(flow1.single())
}
2.zip merge combine
/**
* zip 组合两个流,双方都有新数据才会发射处理
*
* 将[当前]流中的值与[其它]流压缩,使用所提供的[transform]函数应用于每一对值。
* 一旦一个流完成,产生的流就完成了,并且在剩余的流上调用cancel
*/
val flow1 = flow {
List(4) { emit(it) }
}
val flow2 = flow {
List(2) { emit(it) }
}.map {
"it = $it"
}
flow1.zip(flow2) { f1, f2 ->
"f1: $f1 - f2: $f2"
}.collect {
println(it)
}
输出:f1: 0 - f2: it = 0
f1: 1 - f2: it = 1
/**
* merge 将给定的流合并成单个流
*/
suspend fun merge() {
val flow1 = flow {
List(3) {
emit(it)
delay(100)
}
}
val flow2 = flow {
List(2) {
emit(it)
delay(200)
}
}.map {
"it = $it"
}
listOf(flow2, flow1).merge().collect {
delay(100)
println(it)
}
}
输出:
it = 0
0
1
it = 1
2
/**
* combine
* 组合两个流,在经过第一次发射以后,任意方有新数据来的时候就可以发射,另一方有可能是已经发射过的数据
*
* 每个最近的流的值的组合
*
*
*/
suspend fun combine() {
val flow1 = flow {
List(1) {
emit(it)
delay(100)
}
}
val flow2 = flow {
List(4) {
emit(it)
delay(150)
}
}.map {
"it = $it"
}
flow1.combine(flow2) { f1, f2 ->
"f1: $f1 - f2: $f2"
}.collect {
delay(100)
println(it)
}
}
输出:
f1: 0 - f2: it = 0
f1: 1 - f2: it = 0
f1: 1 - f2: it = 1
f1: 1 - f2: it = 2
f1: 1 - f2: it = 3
3.flatten家族
/**
*
* flattenConcat 串行处理数据 是按顺序拼接的
* flattenMerge,并发 collect 数据 它会并发拼接,因此结果不会保证顺序
*
*/
suspend fun flattenConcat() {
val flow = flow {
List(4) {
emit(it)
delay(110)
}
}.map {
flow {
List(it) { emit(it) }
delay(150)
}
}
val time = measureTimeMillis {
flow.flattenConcat()
.collect { println(it) }
}
println(time)
}
输出:
flattenConcat -> 0 , 01 ,012, 0123 time:1066
flattenMerge -> 0 , 01 ,012, 0123 time:549
/**
*
flatMapConcat:等待内部流完成,然后开始收集下一个流
1: First -- 122
1: Second -- 623
2: First -- 725
2: Second -- 1226
3: First -- 1328
3: Second -- 1830
1830
flatMapMerge:
同时收集所有传入流并将其值合并到单个流中,以便尽快发出值
先执行序列 map{requestFlow(it)},然后对返回值调用 flattenMerge
1: First -- 215
2: First -- 296
3: First -- 399
1: Second -- 714
2: Second -- 797
3: Second -- 902
949
*
*
*/
suspend fun flatMapConcat() {
val start = System.currentTimeMillis()
val time = measureTimeMillis {
(1..3).asFlow()
.onEach { delay(100) }
.flatMapMerge {
requestFlow(it)
}.collect {
println("$it -- ${System.currentTimeMillis() - start}")
}
}
println(time)
}
4. Flow 的背压
/**
* Flow 的背压
* buffer并发 数据发射并发,collect 不并发
*
* 未使用 buffer, 一个emit 一个collect
* collect 0
emit 0
...
collect 3
emit 3
总时间 2828
buffer 后:
emit 0
emit 1
collect 0
emit 2
emit 3
collect 1
collect 2
collect 3
2064
*
*/
suspend fun buffer() {
val flow = flow<Int> {
repeat(4) {
emit(it)
delay(200)
println("emit $it")
}
}
val time = measureTimeMillis {
flow.buffer()
.collect {
delay(500)
println("collect $it")
}
}
println(time)
}
/**
* 背压?
如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压
问题产生的根本原因是生产和消费速率的不匹配,
除直接优化消费者的性能以外,我们也可以采取一些取舍的手段
1.conflate。与 Channel 的 Conflate 模式一致,
新数据会覆盖老数据
emit 0
emit 1
emit 2
emit 3
collect 0
collect 3
1073
*/
suspend fun conflate() {
val flow = flow<Int> {
repeat(4) {
emit(it)
println("emit $it")
}
}
val time = measureTimeMillis {
flow.conflate()
.collect {
delay(500)
println("collect $it")
}
}
println(time)
}
/**
* 2.collectLatest
* emit 0
emit 1
emit 2
emit 3
collect 3
569
*
*/
suspend fun collectLatest() {
val flow = flow<Int> {
repeat(4) {
emit(it)
delay(100)
println("emit $it")
}
}
val time = measureTimeMillis {
flow
.collectLatest {
delay(500)
println("collect $it")
}
}
println(time)
}
5.Flow 生产者和消费者的通信是同步非阻塞的,也就是生产和消费会顺序交替进行,channelFlow 可实现生产消费可以并行执行
/**
*
* channelFlow 生产消费可以并行执行
*
0
1
2
3
channelFlow = 2082
0
1
2
3
flow emit = 4019
*
*/
suspend fun channelFlow() {
val time1 = measureTimeMillis {
productChannelFlow().collect {
delay(500)
println(it)
}
}
println("channelFlow = $time1")
val time2 = measureTimeMillis {
product2().collect {
delay(500)
println(it)
}
}
println("flow emit = $time2")
val time3 = measureTimeMillis {
createChannelFlow()
}
println("createChannelFlow = $time3")
}
网友评论