Kotlin 的 Flow 用于流式编程。
Flow 基本使用
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect {
println(it)
}
}
}
输出:
1
2
3
4
5
Flow 生命周期
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.collect {
println(it)
}
}
}
输出:
onStart
1
2
3
4
5
onCompletion
Flow 发生异常时的生命周期
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw Exception("my error")
}
}.onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.collect {
println(it)
}
}
}
输出:
onStart
1
2
3
onCompletion
Exception in thread "main" java.lang.Exception: my error
可以看出,Flow 发生异常时,也会先回调 onCompletion,再抛出异常。
Flow 的 onCompletion 可以携带异常信息
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw Exception("my error")
}
}.onStart {
println("onStart")
}.onCompletion { error ->
println("onCompletion $error")
}.collect {
println(it)
}
}
}
输出如下:
onStart
1
2
3
onCompletion java.lang.Exception: my error
Exception in thread "main" java.lang.Exception: my error
at ...
需要注意的是,catch 和 onCompletion 一起使用时,两者的顺序会影响运行结果:
- 如果在 onCompletion 之前调用 catch,只有 catch 中能收到异常信息,onCompletion 中将收不到异常信息
- 如果在 onCompletion 之后调用 catch,onCompletion 和 catch 中都可以收到异常信息
- 单独使用 onCompletion 时,如果出现异常,onCompletion 能捕获到异常。但它并不会 catch 这个异常,而是处理完后把异常抛出去。
Flow 异常处理
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw Exception("my error")
}
}.onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.catch { exception ->
println("catch:$exception")
}.collect {
println(it)
}
}
}
输出:
onStart
1
2
3
onCompletion
catch:java.lang.Exception: my error
Flow Retry 机制
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw Exception("my error")
}
}.retry(3).onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.catch { exception ->
println("catch:$exception")
}.collect {
println(it)
}
}
}
输出:
onStart
1
2
3
1
2
3
1
2
3
1
2
3
onCompletion
catch:java.lang.Exception: my error
Flow 超时机制
fun main() {
runBlocking {
withTimeout(200) {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.collect {
println(it)
}
}
}
}
输出如下:
onStart
1
onCompletion
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms
at ...
Flow 超时处理
fun main() {
runBlocking {
try {
withTimeout(200) {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
println("onStart")
}.onCompletion {
println("onCompletion")
}.collect {
println(it)
}
}
} catch (e: TimeoutCancellationException) {
println("Timeout")
}
}
}
输出如下:
onStart
1
onCompletion
Timeout
暂时只能用 try-catch 做超时处理。
Flow 执行时间
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
flow {
for (i in 1..5) {
delay(100)
println("emit: $i")
emit(i)
}
}.collect {
delay(100)
println("collect: $it")
}
println("Cost: ${System.currentTimeMillis() - startTime}ms")
}
}
输出如下:
emit: 1
collect: 1
emit: 2
collect: 2
emit: 3
collect: 3
emit: 4
collect: 4
emit: 5
collect: 5
Cost: 1105ms
耗时约 1000ms,这是因为 Flow 的生产者和消费者是交替执行的。
Flow 生产者和消费者交替执行
网友评论