美文网首页
kotlinx.coroutines Channel

kotlinx.coroutines Channel

作者: Zero_Wrold | 来源:发表于2019-05-22 14:01 被阅读0次

    Channel

    Channel 的概念与BlockingQueue相似两者最大的不同在于,后者阻塞式的put操作变成了挂起等待的send,而阻塞式的take变成了挂起等待的receive

        val channel = Channel<Int>()
        launch {
            // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
            for (x in 1..5) channel.send(x * x)
        }
    // 这里我们打印了 5 次被接收的整数:
        repeat(5) { println(channel.receive()) }
        println("Done!")
    }
    
    /*
    1
    4
    9
    16
    25
    Done!
    */
    

    与队列不同channel可以关闭,这表明没有更多的元素了,在接收者中可以定期的使用for循环从channel 中接收元素

        val channel = Channel<Int>()
        launch {
            // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
            for (x in 1..5) channel.send(x * x)
            channel.close()//这里结束发送
        }
    // 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
        for (y in channel) println(y)
        println("Done!")
    }
    
    /*
    1
    4
    9
    16
    25
    Done!
    */
    

    构建Channel 生产者

    协程持续生成一列数据的模型很常用,就是生产者——消费者模式的一部分,这在并发代码中很常见。你可以选择把这样一个生产者抽象成一个函数,你可以将生产者抽象成一个函数,并且使channel作为它的参数,但这与必须从函数中返回结果的常识相违悖.
    这里提供了一个 produce 函数,它可以很方便地构造协程,并且使得生产者的操作更为简便,还有一个扩展函数 consumeEach,它替代了消费者的 for 循环

    fun CoroutineScope.produceNumbers() = produce<Int> {
        for (x in 1..5) send(x * x)
    }
    
    fun main(args: Array<String>)= runBlocking<Unit> {
        val squares = produceNumbers()
        squares.consumeEach { println(it) }
        println("Done!")
    }
    /*
    1
    4
    9
    16
    25
    Done!
     */
    

    使用Channel 生成素数

    fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
        var x = start
        while (true) send(x++) // 开启了一个从start开始的无限的整数流
    }
    fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
        for (x in numbers) if (x % prime != 0) send(x)
    }
    fun main(args: Array<String>)= runBlocking<Unit> {
        var cur = numbersFrom(2)
        for (i in 1..10) {
            val prime = cur.receive()
            println(prime)
            cur = filter(cur, prime)
        }
        coroutineContext.cancelChildren() // 取消所有的子协程来让主协程结束
    }
    
    

    cancelChildren方法可以取消所有子协程

    一个生产者,多个消费者

    fun CoroutineScope.produceNumbers() = produce<Int> {
        var x = 0
        while (true){
            send(x++) // 开启了一个无限的整数流
            delay(100L)//延迟0.1S
        }
    }
    
    fun main(args: Array<String>)= runBlocking<Unit> {
        fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
            channel.consumeEach {
                println("Processor #$id received $it")
            }
        }
        val producer = produceNumbers()
        repeat(5) { launchProcessor(it, producer) }
        delay(950L)
        producer.cancel() // 取消生产者协程,kill 所有
    }
    /**
     * 
     Processor #0 received 0
    Processor #0 received 1
    Processor #1 received 2
    Processor #2 received 3
    Processor #3 received 4
    Processor #4 received 5
    Processor #0 received 6
    Processor #1 received 7
    Processor #2 received 8
    Processor #3 received 9
     */
    

    多个协程可以接收来自同一个channel的数据,它们之间会做分发处理

    多个生产者,一个消费者

    suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
        while (true) {
            delay(time)
            channel.send(s)
        }
    }
    fun main(args: Array<String>)= runBlocking<Unit> {
        val channel = Channel<String>()
        launch { sendString(channel, "foo", 200L) }
        launch { sendString(channel, "BAR!", 500L) }
        repeat(6) { // 接收前六个
            println(channel.receive())
        }
        coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
    }
    /**
    foo
    foo
    BAR!
    foo
    foo
    BAR!
     */
    

    多个协程也可以往同一个Channel 发送数据

    带缓冲的Channel

    没有缓冲的 channel 会在发送者和接收者都准备好之后进行数据传输,如果 send先调用,那么它会挂起等到 receive也被调用.
    Channelproduce 函数都有一个参数capacity,用于指定缓冲区的大小,。缓冲区允许发送者在挂起等待之前先发送几条数据,这和指定 capacityBlockingQueue 相似,缓冲区满了就会阻塞。

    fun main(args: Array<String>)= runBlocking<Unit> {
        val channel = Channel<Int>(4) // 启动带缓冲的通道
        val sender = launch { // 启动发送者协程
            repeat(10) {
                println("Sending $it") // 在每一个元素发送前打印它们
                channel.send(it) // 将在缓冲区被占满时挂起
            }
        }
    // 没有接收到东西……只是等待……
        delay(1000)
        sender.cancel() // 取消发送者协程
    }
    /**
    Sending 0
    Sending 1
    Sending 2
    Sending 3
    Sending 4
     */
    
    

    Channel 公平

    发送和接收都是公平的,使是从不同的协程调用,它也严格按照调用的顺序分配。采取的原则是先进先出

    data class Ball(var hits: Int)
    
    suspend fun player(name: String, table: Channel<Ball>) {
        for (ball in table) { // 在循环中接收球
            ball.hits++
            println("$name $ball")
            delay(300) // 等待一段时间
            table.send(ball) // 将球发送回去
        }
    }
    fun main(args: Array<String>)= runBlocking<Unit> {
        val table = Channel<Ball>() // 一个共享的 table(桌子)
        launch { player("ping", table) }
        launch { player("pong", table) }
        table.send(Ball(0)) // 乒乓球
        delay(1000) // 延迟 1 秒钟
        coroutineContext.cancelChildren() // 游戏结束,取消它们
    }
    /**
    ping Ball(hits=1)
    pong Ball(hits=2)
    ping Ball(hits=3)
    pong Ball(hits=4)
     */
    
    

    注意,有时因为执行的特性,channel 的生产者的执行并不公平,具体看这个 issue

    计时器Channel

    计时器Channel 是一种特别的会合Channel,每次经过特定的延迟都会从该通道进行消费并产生Unit ,虽然它看起来似乎没用,它被用来构建分段来创建复杂的基于时间的 produce Channel 和进行窗口化操作以及其它时间相关的处理。 可以在 select 中使用计时器Channel 来进行“打勾”操作。
    使用工厂方法ticker来创建Channel, 为了表明不需要其它元素,请使用 ReceiveChannel.cancel方法
    需要注意,ticker 很关注消费者的暂停,默认情况下,如果发生了停顿就会判断下一次生产的元素的延时,你需要试着维护一个固定频率的生产速度。

    相关文章

      网友评论

          本文标题:kotlinx.coroutines Channel

          本文链接:https://www.haomeiwen.com/subject/jhjyzqtx.html