通道
延期的值提供了⼀种便捷的方法使单个值在多个协程之间进行相互传输。通道提供了⼀种在流中传输值的方法。
通道基础
⼀个 Channel 是⼀个和 BlockingQueue 非常相似的概念。其中⼀个不同是它代替了阻塞的 put 操作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive。
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
}
repeat(5) { println(channel.receive()) }
println("Done!")
}
输出:
1
4
9
16
25
Done!
关闭与迭代通道
和队列不同,⼀个通道可以通过被关闭来表明没有更多的元素将会进入通道。在接收者中可以定期的使用 for 循环来从通道中接收元素。
⼀个 close 操作就像向通道发送了⼀个特殊的关闭指令。这个迭代停止就说明关闭指令已经被接收了。所以这里保证所有先前发送出去的元素都在通道关闭前被接收到。
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close()
}
for (y in channel) println(y)
println("Done!")
}
构建通道生产者
将生产者抽象成⼀个函数,并且使通道作为它的参数。
协程构建器 produce ,可以很容易的在生产者端正确工作,并且我们使用扩展函数 consumeEach 在消费者端替代 for 循环:
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
管道
管道是⼀种⼀个协程在流中开始生产可能无穷多个元素的模式。并且另⼀个或多个协程开始消费这些流,做⼀些操作,并生产了⼀些额外的结果。
fun main() = runBlocking {
val numbers = produceNumbers()
val squares = square(numbers)
repeat(5) {
println(squares.receive()) // 输出前五个
}
println("Done!") // 完成
coroutineContext.cancelChildren() // 取消子协程
}
fun CoroutineScope.produceNumbers() = produce {
var x = 1
while (true) send(x++) // 在流中开始从1生产无穷多个整数
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x) //对数字做平方操作
}
使用管道的素数
fun main() = runBlocking {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren()
}
//开启了⼀个数字的无限序列
fun CoroutineScope.numbersFrom(start: Int) = produce {
var x = start
while (true) send(x++)
}
//管道中过滤了来源于流中的数字,删除了所有可以被给定素数整除的数字
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce {
for (x in numbers) if (x % prime != 0) send(x)
}
输出:
2
3
5
7
11
13
17
19
23
29
扇出
多个协程也许会接收相同的管道,在它们之间进行分布式工作。
fun main() = runBlocking {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel()
}
fun CoroutineScope.produceNumbers() = produce {
var x = 1
while (true) {
send(x++)
delay(100)
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
接收每个特定整数的处理器 id 可能会不同。输出:
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
取消生产者协程将关闭它的通道,从而最终终止处理器协程正在执行的此通道上的迭代。与 consumeEach 不同,这个 for 循环是安全完美地使用多个协程的。如果其中⼀个处理器协程执行失败,其它的处理器协程仍然会继续处理通道,而通过 consumeEach 编写的处理器始终在正常或非正常完成时消耗(取消)底层通道。
扇入
多个协程可以发送到同⼀个通道。
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
输出:
foo
foo
BAR!
foo
foo
BAR!
带缓冲的通道
无缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送先被调用,则它将被挂起直到接收被调用,如果接收先被调用,它将被挂起直到发送被调用。
Channel() 工⼚函数与 produce 建造器通过⼀个可选的参数 capacity 来指定缓冲区大小 。缓冲允许发送者在被挂起前发送多个元素,就像 BlockingQueue 有指定的容量⼀样,当缓冲区被占满的时候将会引起阻塞。
fun main() = runBlocking {
val channel = Channel<Int>(4) // 启动带缓冲的通道
val sender = launch { // 启动发送者协程
repeat(10) {
println("Sending $it")
channel.send(it) // 将在缓冲区被占满时挂起
}
}
// 没有接收到东西……只是等待……
delay(1000)
sender.cancel()// 取消发送者协程
}
输出:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前四个元素被加入到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。
通道是公平的
发送和接收操作是公平的并且尊重调用它们的多个协程。它们遵守先进先出原则。
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>()// ⼀个共享的 table(桌⼦,即通道,有点扇出扇入结合的意思)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0))
delay(1000)
coroutineContext.cancelChildren()
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) {// 在循环中接收球
ball.hits++
println("$name $ball")
delay(300)
table.send(ball)// 将球发送回去
}
}
输出:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
“乒”协程首先被启动,所以它首先接收到了球。甚至虽然“乒” 协程在将球发送会桌子以后立即开始接收,但是球 还是被“乓” 协程接收了,因为它⼀直在等待着接收球。
计时器通道
计时器通道是⼀种特别的会合通道,每次经过特定的延迟都会从该通道进行消费并产生 Unit 。虽然它看起来似乎没用,它被用来构建分段来创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理。可以在 select 中使用计时器通道来进行“打勾”操作。
使用工厂方法ticker来创建这些通道。为了表明不需要其它元素,请使用 ReceiveChannel.cancel 方法。
fun main() = runBlocking {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)//创建计时器通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement")
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模拟⼤量消费延迟
println("Consumer pauses for 150ms")
delay(150)
// 下⼀个元素⽴即可⽤
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// 请注意,`receive` 调⽤之间的暂停被考虑在内,下⼀个元素的到达速度更快
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
// 表明不再需要更多的元素
tickerChannel.cancel()
}
输出:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker 知道可能的消费者暂停,并且默认情况下会调整下⼀个生成的元素的时间,如果发生暂停则延迟,试图保持固定的生成元素率。
异常处理
异常的传播
协程构建器有两种形式:自动传播异常(launch 与 actor)或向用户暴露异常(async 与 produce)。当这些构建器用于创建⼀个根协程时,即该协程不是另⼀个协程的子协程,前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler,而后者则依赖用户来最终消费异常,例如通过 await 或 receive。
CoroutineExceptionHandler
将未捕获异常打印到控制台的默认行为是可自定义的。
CoroutineExceptionHandler 仅在未捕获的异常上调用 ——没有以其他任何方式处理的异常。特别是,所有子协程(在另⼀个 Job 上下文中创建的协程)委托它们的父协程处理它们的异常,然后它们也委托给其父协程,以此类推直到根协程,因此永远不会使用在其上下文中设置的 CoroutineExceptionHandler 。除此之外,async 构建器始终会捕获所有异常并将其表示在结果 Deferred 对象中,因此它的 CoroutineExceptionHandler 也无效。
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {// 根协程,运行在 GlobalScope 中
throw AssertionError()
}
val deferred = GlobalScope.async(handler) {// 同样是根协程,但使用 async 代替了 launch
throw ArithmeticException()// 没有打印任何东西,依赖用户去调用 deferred.await()
}
joinAll(job, deferred)
}
输出:
CoroutineExceptionHandler got java.lang.AssertionError
取消与异常
协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被用来作为额外调试信息的资源。当⼀个协程使用 Job.cancel 取消的时候,它会被终止,但是它不会取消它的父协程。
如果⼀个协程遇到了 CancellationException 以外的异常,它将使用该异常取消它的父协程。这个行为无法被覆盖,并且用于为结构化的并发(structured concurrency)提供稳定的协程层级结构。
异常聚合
当协程的多个子协程因异常而失败时,⼀般规则是“取第⼀个异常”,因此将处理第⼀个异常。在第⼀个异常之后发生的所有其他异常都作为被抑制的异常绑定至第⼀个异常。
监督
取消是在协程的整个层次结构中传播的双向关系。如子协程异常取消,其父协程子协程也会被取消。
监督作业
SupervisorJob 可用于子协程失败并不需要停止整个作用域上的协程,而父协程失败,需要停止所有子协程。它类似于常规的 Job,唯⼀的不同是:SupervisorJob 的取消只会向下传播。
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// 启动第⼀个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// 启动第⼆个子作业
val secondChild = launch {
firstChild.join()
// 取消了第⼀个子作业且没有传播给第⼆个子作业
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// 但是取消了监督的传播
println("The second child is cancelled because the supervisor was cancelled")
}
}
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
输出:
The first child is failing
The first child is cancelled: true, but the second one is still active
Cancelling the supervisor
The second child is cancelled because the supervisor was cancelled
监督作用域
对于作用域的并发,可以用 supervisorScope 来替代 coroutineScope 来实现相同的目的。它只会单向的传播并且当作业自身执行失败的时候将所有⼦作业全部取消。作业自身也会在所有的子作业执行结束前等待。
fun main() = runBlocking {
try {
supervisorScope {
launch {
try {
println("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("The child is cancelled")
}
}
// 使⽤ yield 来给我们的⼦作业⼀个机会来执⾏打印
yield()
println("Throwing an exception from the scope")
throw AssertionError()
}
} catch(e: AssertionError) {
println("Caught an assertion error")
}
}
输出:
The child is sleeping
Throwing an exception from the scope
The child is cancelled
Caught an assertion error
监督协程中的异常
常规的作业和监督作业之间的另⼀个重要区别是异常处理。监督协程中的每⼀个子作业应该通过异常处理机制处理自身的异常。这种差异来自于子作业的执行失败不会传播给它的父作业的事实。这意味着在 supervisorScope 内部直接启动的协程确实使用了设置在它们作用域内的 CoroutineExceptionHandler,与父协程的方式相同。
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
supervisorScope {
launch(handler) {
println("The child throws an exception")
throw AssertionError()
}
println("The scope is completing")
}
println("The scope is completed")
}
输出:
The scope is completing
The child throws an exception
CoroutineExceptionHandler got java.lang.AssertionError
The scope is completed
网友评论