Flow 源码如下
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
创建常规 Flow 的常用方式
flow{...}
private fun createFlow(): Flow<Int> = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
flowOf()
//flowOf() 构建器定义了一个发射固定值集的流, 使用 flowOf 构建 Flow 不需要显示调用 emit() 发射数据
//因为在扩展函数内部使用了emit
private fun createFlow2(): Flow<Int> = flowOf(1, 2, 3)
asFlow()
//使用 asFlow() 扩展函数,可以将各种集合与序列转换为流,也不需要显示调用 emit() 发射数据
//因为在扩展函数内部使用了emit
private fun createFlow3(): Flow<Int> = listOf(1, 2, 3).asFlow()
Flow 是冷流(惰性的)
在调用末端流操作符(collect 是其中之一)之前, flow{ ... } 中的代码不会执行。我们称之为冷流。
private fun createFlow(): Flow<Int> = flow {
Log.i("minfo", "flow started")
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
fun code() = runBlocking {
val flow = createFlow()
Log.i("minfo", "flow collect")
flow.collect {
Log.i("minfo", it.toString())
}
Log.i("minfo", "flow collect again")
flow.collect {
Log.i("minfo", it.toString())
}
}
打印结果:
这里先执行了flow collect,在执行的flow started,先collect才会flow才会执行。
flow collect
flow started
1
2
3
flow collect again
flow started
1
2
3
Flow 的取消
流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。取消Flow 只需要取消它所在的协程即可。
fun main() = runBlocking {
withTimeoutOrNull(250) {
simple().collect {
Log.i("minfo", it.toString())
}
}
Log.i("minfo", "Done")
}
打印:
1
2
Done
末端流操作符
collect
收集上游发送的数据
reduce
reduce 类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。
fun reduce() = runBlocking {
val sum = (1..5).asFlow().reduce { a, b ->
a + b
}
Log.i("minfo", sum.toString())
}
结果:
15
launchIn
launchIn 用来在指定的 CoroutineScope 内启动 flow, 需要传入一个参数: CoroutineScope。
我们希望并行执行两个 Flow,为每个 Flow 单独起一个协程:
fun launchIn() = runBlocking {
(1..5).asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.IO)
.onEach { Log.i("minfo", it.toString()) }
.launchIn(this)
flowOf("one", "two", "three", "four", "five")
.onEach { delay(200) }
.flowOn(Dispatchers.IO)
.onEach { Log.i("minfo", it.toString()) }
.launchIn(this)
}
打印结果:
1
one
2
3
two
4
5
three
four
five
流是连续的
Flow 的每次单独收集都是按顺序执行的,除非进行特殊操作的操作符使用多个流。 默认情况下不启动新协程。 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
示例:
fun influence() = runBlocking {
(1..5).asFlow()
.filter {
Log.i("minfo", "filter $it")
it % 2 == 0
}
.map {
Log.i("minfo", "map $it")
"String $it"
}
.collect {
Log.i("minfo", "collect $it")
}
}
打印结果:
filter 1
filter 2
map 2
collect String 2
filter 3
filter 4
map 4
collect String 4
filter 5
onStart 流启动时/onCompletion 流完成时
fun onStartCompletion() = runBlocking {
(1..5).asFlow()
.onEach { delay(200) }
.onStart {
Log.i("minfo", "onStart")
}
.onCompletion {
Log.i("minfo", "onCompletion")
}
.collect {
Log.i("minfo", "$it")
}
}
打印结果:
onStart
1
2
3
4
5
onCompletion
flowOn 切换线程
Flow 是基于 CoroutineContext 进行线程切换的。因为 Collect 是一个 suspend 函数,必须在 CoroutineScope 中执行,所以响应线程是由 CoroutineContext 决定的。比如,在 Main 线程总执行 collect, 那么响应线程就是 Dispatchers.Main。
Flows 通过 flowOn 方法来切换线程。
fun dispatcher() = runBlocking {
val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
(1..5).asFlow()
.onEach {
//生产数据
Log.i("minfo", "${Thread.currentThread().name} + product: $it")
}.flowOn(Dispatchers.IO)
.map {
//转换数据
Log.i("minfo", "${Thread.currentThread().name} + $it to String")
"String: $it"
}.flowOn(mDispatcher)
.onCompletion {
mDispatcher.close()
}
.collect {
//消费数据
Log.i("minfo", "${Thread.currentThread().name} + collect: + $it")
}
}
打印结果:
DefaultDispatcher-worker-1 + product: 1
DefaultDispatcher-worker-1 + product: 2
DefaultDispatcher-worker-1 + product: 3
DefaultDispatcher-worker-1 + product: 4
DefaultDispatcher-worker-1 + product: 5
pool-2-thread-1 + 1 to String
main + collect: + String: 1
pool-2-thread-1 + 2 to String
main + collect: + String: 2
pool-2-thread-1 + 3 to String
main + collect: + String: 3
pool-2-thread-1 + 4 to String
pool-2-thread-1 + 5 to String
main + collect: + String: 4
main + collect: + String: 5
可以看到,发射数据是在 Dispatchers.IO 线程执行的, map 操作时在我们自定义的线程池中进行的,collect 操作在 Dispatchers.Main 线程进行。
Flow 中间转换操作符
map
map 操作符用于 Flow 表示将流中的每个元素进行转换后再发射出来。
fun map() = runBlocking {
(1..5).asFlow().map { "string $it" }
.collect {
Log.i("minfo", it)
}
}
transform
在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别:
fun transform() = runBlocking {
(1..5).asFlow().transform {
emit(it * 2)
delay(1000)
emit("String : $it")
}.collect {
Log.i("minfo", "" + it)
}
}
打印:
2
String : 1
4
String : 2
6
String : 3
8
String : 4
10
String : 5
onEach
遍历自行处理
fun onEach() = runBlocking {
(1..5).asFlow()
.onEach {
Log.i("minfo", "onEach $it")
}.collect {
Log.i("minfo", "$it")
}
}
filter
按条件过滤
fun filter() = runBlocking {
(1..5).asFlow()
.filter { it % 2 == 0 }
.collect {
Log.i("minfo", "$it")
}
}
take
take 操作符只取前几个 emit 发射的值
fun take() = runBlocking {
(1..5).asFlow()
.take(2).collect {
Log.i("minfo", "$it")
}
}
zip
zip 是可以将2个 flow 进行合并的操作符
fun zip() = runBlocking {
val flowA = (1..5).asFlow()
val flowB = flowOf("one" , "two" , "three" , "four", "five")
flowA.zip(flowB) { a, b ->
"$a -- $b"
}.collect {
Log.i("minfo", "$it")
}
}
打印结果:
1 -- one
2 -- two
3 -- three
4 -- four
5 -- five
flattenConcat 扁平化
flattenConcat 将给定流按顺序展平为单个流,而不交错嵌套流。
fun flattenConcat() = runBlocking {
val flowA = (1..5).asFlow().onEach { delay(1000) }
val flowB = flowOf("one", "two", "three","four","five").onEach { delay(1000) }
flowOf(flowA, flowB).flattenConcat()
.collect {
Log.i("minfo", "$it")
}
}
打印结果:
1
2
3
4
5
one
two
three
four
five
flatMapLatest
当发射了新值之后,上个 flow 就会被取消。
fun flatMapLatest() = runBlocking {
(1..5).asFlow().onEach { delay(100) }
.flatMapLatest {
flow {
Log.i("minfo", "begin flatMapLatest $it")
delay(200)
emit("String $it")
Log.i("minfo", "end flatMapLatest $it")
}
}.collect {
Log.i("minfo", "$it")
}
}
打印结果:
begin flatMapLatest 1
begin flatMapLatest 2
begin flatMapLatest 3
begin flatMapLatest 4
begin flatMapLatest 5
end flatMapLatest 5
String 5
网友评论