一个 Channel 是一个和 Java 的BlockingQueue
类似
val channel = Channel<Int>()
launch {
// 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
for (x in 1..5) channel.send(x * x)
channel.close() // 我们结束发送
}
// 这里我们打印了 5 次被接收的整数:
repeat(5) { println(channel.receive()) }
println("Done!")
管道
管道是一种一个协程在流中开始生产可能无穷多个元素的模式
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中开始从 1 生产无穷多个整数
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
fun main() = runBlocking {
val numbers = produceNumbers() // 从 1 开始生产整数
val squares = square(numbers) // 对整数做平方
for (i in 1..5) println(squares.receive()) // 打印前 5 个数字
println("Done!") // 我们的操作已经结束了
coroutineContext.cancelChildren() // 取消子协程
}
单个生产者,多个消费者
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消协程生产者从而将它们全部杀死
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // 产生下一个数字
delay(100) // 等待 0.1 秒
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
多个生产者,单一消费者
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六个
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
缓冲通道
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(4) // 启动带缓冲的通道
val sender = launch { // 启动发送者协程
repeat(10) {
println("Sending $it") // 在每一个元素发送前打印它们
channel.send(it) // 将在缓冲区被占满时挂起
}
}
// 没有接收到东西……只是等待……
delay(1000)
sender.cancel() // 取消发送者协程
}
// 打印出0,1,2,3,4
// 前四个元素被加入到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。
公平通道
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // 一个共享的 table(桌子)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 乒乓球
delay(1000) // 延迟 1 秒钟
coroutineContext.cancelChildren() // 游戏结束,取消它们
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循环中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待一段时间
table.send(ball) // 将球发送回去
}
}
记时通道
// 注意,withTimeout 会挂起当前协程
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // 初始尚未经过的延迟
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 所有随后到来的元素都经过了 100 毫秒的延迟
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模拟大量消费延迟
println("Consumer pauses for 150ms")
delay(150)
// 下一个元素立即可用
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 表明不再需要更多的元素
}
网友评论