Flow是一种类似于序列的冷流(冷启动l,即lazy的),flow构建器中的代码直接到流被收集的时候才运行。
一、流的特性
- 流的每次单独收集都是按顺序执行的,除非使用特殊的操作符。
- 从上游到下游每个过度操作符都会处理每个发射出的值,然后再交给末端操作符
下面距离说明下
fun testFlow() = runBlocking<Unit> {
(1..5).asFlow().filter {
it%2==0
}.map {
"string $it"
}.collect {
println("Collect $it")
}
}
二、流的上下文
- 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
- flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
- flowOn操作符,该函数用于更改流发射的上下文。
fun testFlowOn() = runBlocking<Unit> {
flow<Int> {
println("start ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
.flowOn(Dispatchers.IO+CoroutineName("IOWorker"))
.collect {
println("Collect $it ${Thread.currentThread().name}")
}
}
//print result
//start DefaultDispatcher-worker-1 @IOWorker#2
//Collect 1 main @coroutine#1
//Collect 2 main @coroutine#1
//Collect 3 main @coroutine#1
- 使用launchIn函数指定协程中收集流 ,launchIn函数返回一个Job对象
fun testLaunchIn() = runBlocking<Unit> {
flow<Int> {
println("start ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
.flowOn(Dispatchers.IO + CoroutineName("IOWorker"))
.onEach {
println("onEach $it ${Thread.currentThread().name}")
}
.launchIn(CoroutineScope(Dispatchers.Default + CoroutineName("LaunchWorker")))
.join()
}
//print result
//start DefaultDispatcher-worker-2 @IOWorker#3
//onEach 1 DefaultDispatcher-worker-1 @LaunchWorker#2
//onEach 2 DefaultDispatcher-worker-2 @LaunchWorker#2
//onEach 3 DefaultDispatcher-worker-1 @LaunchWorker#2
三、流的取消、流的取消检测
- 流构建器对每个发射执行附加的ensureActive检测以进行取消,这意味着从flow{...}发出的繁忙循环是可以取消的。
- 出于性能原因,大多数流操作不会执行其他取消检测,在协程处于繁忙的循环情况下,必须明确检测是否可以取消(通过cancellable操作符执行此操作)。
//如果不使用cancellable操作符 取消会失败,因为协程比较繁忙。
fun testCancel() = runBlocking<Unit> {
flow<Int> {
for (i in 1..5) {
println("emit $i")
emit(i)
}
}.cancellable()
.collect {
println(it)
if (it == 3) {
cancel()
}
}
}
//print result
//emit 1
//1
//emit 2
//2
//emit 3
四、背压
水流受到与流动方向一致的压力称为背压。—>(背压) =>(水流)
在生产者与消费者模型(Flow)中,生产者的生产速度大于消费者的的消费速度就会产生背压。
- buffer(),实现并发运行流中发射元素的代码。
- conflate(),合并发射项,不对每个值进行处理。
- collectLatest(),取消并重新发射最后一个值。
- 当必须更改CoroutineDispatcher时,flowOn操作符使用了同样的缓冲机制(buffer),但是buffer函数显示的请求缓冲而不改变执行上下文。
fun testBuffer() = runBlocking<Unit> {
val duration = measureTimeMillis {
flow<Int> {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.buffer(10)
.collect {
delay(300)
println("Collected ${Thread.currentThread().name}")
}
}
println("total duration $duration")
}
//print result
//...
//total duration 1811
//总时间少于2000ms
//conflate()处理最新值
fun testConflate() = runBlocking<Unit> {
val duration = measureTimeMillis {
flow<Int> {
for (i in 1..5) {
delay(100)
println("emit $i ")
emit(i)
}
}
.conflate()
.collect {
delay(300)
println("Collected $it in ${Thread.currentThread().name}")
}
}
println("total duration $duration")
}
//emit 1
//emit 2
//emit 3
//Collected 1 in main @coroutine#1
//emit 4
//emit 5
//Collected 3 in main @coroutine#1
//Collected 5 in main @coroutine#1
//total duration 1249
//collectLatest 收集最后一个值
fun testCollectLatest() = runBlocking<Unit> {
val duration = measureTimeMillis {
flow<Int> {
for (i in 1..4) {
delay(100)
println("emit $i ")
emit(i)
}
}
.collectLatest {
delay(300)
println("Collected $it in ${Thread.currentThread().name}")
}
}
println("total duration $duration")
}
//print result
//emit 1
//emit 2
//emit 3
//emit 4
//Collected 4 in main @coroutine#6
//total duration 940
网友评论