1.通道基础
Deferred提供了协程直接传递单个值的方法,而通道Channels提供了一种传输值流的方法,通过Channels类似BlockingQueue,她们之间关键的区别在于:通道是有一个挂起的send函数和一个挂起的receive函数。而BlockingQueue是一个阻塞的put操作和take操作
package com.example.kotlin01
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val chanel = Channel<Int>()
launch {
for (x in 1..5) {
chanel.send(x)
}
}
repeat(4){ println(chanel.receive())}
}
1
2
3
4
以上结果,每次Receive只获取到一个值。重复四次打印了4次
2.关闭通道和迭代通道
与队列不同的是通道可以关闭,以此来表明元素发送完成,在接收方使用常规的for循环从通道接受元素。其实close类似于向通道发送一个特殊的标记close。一旦收到这个标记,迭代就会停止。因此可以保证接收到close之前发送的所有数据
- 没有发送close标记
package com.example.kotlin01
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val chanel = Channel<Int>()
launch {
for (x in 1..5) {
chanel.send(x)
}
// chanel.close()
}
for (y in chanel){
println(y)
}
println("done")
}
1
2
3
4
5
- 发送close标记
package com.example.kotlin01
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val chanel = Channel<Int>()
launch {
for (x in 1..5) {
chanel.send(x)
}
chanel.close()
}
for (y in chanel){
println(y)
}
println("done")
}
1
2
3
4
5
done
3.构建通道生产者(Building channel producers)
package com.example.kotlin01
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
//消费者
val squares = product()
squares.consumeEach {
println(it)
}
}
//生产者
fun CoroutineScope.product():ReceiveChannel<Int> = produce {
for(x in 1..5){
send(x)
}
}
1
2
3
4
5
网友评论