通道
延期的值提供了⼀种便捷的⽅法使单个值在多个协程之间进⾏相互传输。通道提供了⼀种在流中传输值的⽅法。
通道基础
⼀个 Channel
是⼀个和 BlockingQueue
⾮常相似的概念。其中⼀个不同是它代替了阻塞的 put
操作并提供了挂起的send
,还替代了阻塞的 take
操作并提供了挂起的 `receive。
val channel = Channel<Int>()
launch {
// 这⾥可能是消耗⼤量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平⽅并发送
for (x in 1..5) channel.send(x * x)
}
// 这⾥我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")
这段代码的输出如下:
1
4
9
16
25
Done!
关闭与迭代通道
和队列不同,⼀个通道可以通过被关闭来表明没有更多的元素将会进⼊通道。在接收者中可以定期的使⽤ for
循环
来从通道中接收元素。
从概念上来说,⼀个close
操作就像向通道发送了⼀个特殊的关闭指令。这个迭代停⽌就说明关闭指令已经被接收了。
所以这⾥保证所有先前发送出去的元素都在通道关闭前被接收到。
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我们结束发送
}
// 这⾥我们使⽤ `for` 循环来打印所有被接收到的元素(直到通道被关闭)
for (y in channel) println(y)
println("Done!")
构建通道⽣产者
协程⽣成⼀系列元素的模式很常⻅。这是 ⽣产者⸺消费者 模式的⼀部分,并且经常能在并发的代码中看到它。你可
以将⽣产者抽象成⼀个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的常识相违悖。
这⾥有⼀个名为 produce
的便捷的协程构建器
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
管道
管道是⼀种⼀个协程在流中开始⽣产可能⽆穷多个元素的模式:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中开始从 1 ⽣产⽆穷多个整数
}
并且另⼀个或多个协程开始消费这些流,做⼀些操作,并⽣产了⼀些额外的结果。在下⾯的例⼦中,对这些数字仅仅做
了平⽅操作:
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
主要的代码启动并连接了整个管道:
val numbers = produceNumbers() // 从 1 开始⽣成整数
val squares = square(numbers) // 整数求平⽅
repeat(5) {
println(squares.receive()) // 输出前五个
}
println("Done!") // ⾄此已完成
coroutineContext.cancelChildren() // 取消⼦协程
使⽤管道的素数
让我们来展⽰⼀个极端的例⼦⸺在协程中使⽤⼀个管道来⽣成素数。我们开启了⼀个数字的⽆限序列。
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 开启了⼀个⽆限的整数流
}
下⾯的例⼦打印了前⼗个素数,在主线程的上下⽂中运⾏整个管道。直到所有的协程在该主协程 runBlocking
的作⽤
域中被启动完成。我们不必使⽤⼀个显式的列表来保存所有被我们已经启动的协程。我们使⽤ cancelChildren
扩展
函数在我们打印了前⼗个素数以后来取消所有的⼦协程。
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的⼦协程来让主协程结束
注意,你可以在标准库中使⽤ iterator
协程构建器来构建⼀个相似的管道。使⽤ iterator
替换
produce
、yield
替换 send
、next 替换 receive
、Iterator
替换ReceiveChannel
来摆脱协程作⽤域,
你将不再需要 runBlocking
。然⽽,如上所⽰,如果你在 Dispatchers.Default
上下⽂中运⾏它,使⽤通道的管道的
好处在于它可以充分利⽤多核⼼ CPU。
扇出
多个协程也许会接收相同的管道,在它们之间进⾏分布式⼯作。让我们启动⼀个定期产⽣整数的⽣产者协程(每秒⼗
个数字):
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 从 1 开始
while (true) {
send(x++) // 产⽣下⼀个数字
delay(100) // 等待 0.1 秒
}
}
接下来我们可以得到⼏个处理器协程。在这个⽰例中,它们只是打印它们的 id 和接收到的数字:
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
现在让我们启动五个处理器协程并让它们⼯作将近⼀秒。看看发⽣了什么:
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消协程⽣产者从⽽将它们全部杀死
该输出将类似于如下所⽰,尽管接收每个特定整数的处理器 id 可能会不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消⽣产者协程将关闭它的通道,从⽽最终终⽌处理器协程正在执⾏的此通道上的迭代。
还有,注意我们如何使⽤for
循环显式迭代通道以在 launchProcessor
代码中执⾏扇出。与 consumeEach
不
同,这个for
循环是安全完美地使⽤多个协程的。如果其中⼀个处理器协程执⾏失败,其它的处理器协程仍然会继续
处理通道,⽽通过 consumeEach
编写的处理器始终在正常或⾮正常完成时消耗(取消)底层通道
扇⼊
多个协程可以发送到同⼀个通道。⽐如说,让我们创建⼀个字符串的通道,和⼀个在这个通道中以指定的延迟反复发
送⼀个指定字符串的挂起函数:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
现在,我们启动了⼏个发送字符串的协程,让我们看看会发⽣什么(在⽰例中,我们在主线程的上下⽂中作为主协程的
⼦协程来启动它们):
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六个
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有⼦协程来让主协程结束
输出如下:
foo
foo
BAR!
foo
foo
BAR!
带缓冲的通道
到⽬前为⽌展⽰的通道都是没有缓冲区的。⽆缓冲的通道在发送者和接收者相遇时传输元素(也称“对接”)。如果发送
先被调⽤,则它将被挂起直到接收被调⽤,如果接收先被调⽤,它将被挂起直到发送被调⽤。
Channel()
⼯⼚函数与 produce
建造器通过⼀个可选的参数 capacity
来指定 缓冲区⼤⼩ 。缓冲允许发送者在被
挂起前发送多个元素,就像 BlockingQueue
有指定的容量⼀样,当缓冲区被占满的时候将会引起阻塞。
看看如下代码的表现:
val channel = Channel<Int>(4) // 启动带缓冲的通道
val sender = launch { // 启动发送者协程
repeat(10) {
println("Sending $it") // 在每⼀个元素发送前打印它们
channel.send(it) // 将在缓冲区被占满时挂起
}
}
// 没有接收到东西……只是等待……
delay(1000)
sender.cancel() // 取消发送者协程
使⽤缓冲通道并给 capacity
参数传⼊ 四 它将打印sending
五 次:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前四个元素被加⼊到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。
通道是公平的
发送和接收操作是 公平的 并且尊重调⽤它们的多个协程。它们遵守先进先出原则,可以看到第⼀个协程调⽤
receive
并得到了元素。在下⾯的例⼦中两个协程“乒”和“乓”都从共享的“桌⼦”通道接收到这个“球”元素。
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // ⼀个共享的 table(桌⼦)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 乒乓球
delay(1000) // 延迟 1 秒钟
coroutineContext.cancelChildren() // 游戏结束,取消它们
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循环中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待⼀段时间
table.send(ball) // 将球发送回去
}
}
“乒”协程⾸先被启动,所以它⾸先接收到了球。甚⾄虽然“乒”协程在将球发送会桌⼦以后⽴即开始接收,但是球还是
被“乓”协程接收了,因为它⼀直在等待着接收球:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
注意,有时候通道执⾏时由于执⾏者的性质⽽看起来不那么公平
网友评论