美文网首页
kotlin Channel通道

kotlin Channel通道

作者: Bfmall | 来源:发表于2023-08-10 15:11 被阅读0次

    (1)基本用法

    Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
    生产者/消费者模式 (send - channel - receive)

    Channel的基本用法如下:

    runBlocking {
    
        val channel = Channel<Int>()
        // 生产者
        val producer = GlobalScope.launch {
            var i = 0
            while(true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
    
        // 消费者
        val consumer = GlobalScope.launch {
            while(true) {
                val element = channel.receive()
                println("receive $element")
            }
        }
    
        joinAll(producer, consumer)
    
    }
    

    (2)Channel的容量

    Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,
    并且也一直没有人调用receive取走数据,send就需要挂起。
    故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行。

    Channel的默认大小为0。

    (3)迭代Channel

    Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator。

    runBlocking {
    
        val channel = Channel<Int>(Channel.UNLIMITED)
        // 生产者
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                println("send ${x * x}")
                channel.send(x * x)
            }
        }
    
        // 消费者
        val consumer = GlobalScope.launch {
            val iterator = channel.iterator()
            while(iterator.hasNext()) {
                val element = iterator.next()
                println("receive $element")
                delay(1000)
            }
        }
    
        joinAll(producer, consumer)
    
    }
    

    消费者代码也可以改成:

        // 消费者
        val consumer = GlobalScope.launch {
            for (element in channel) {
                println("receive $element")
                delay(1000)
            }
        }
    

    (4)produce与actor

    构造生产者与消费者的便捷方法。
    我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了。
    反过来,我们可以用actor启动一个消费者协程。

    produce演示:

    runBlocking {
    
        val receiveChannel = GlobalScope.produce<Int> {
            repeat(100) {
                delay(1000)
                send(it)
            }
        }
    
        // 消费者
        val consumer = GlobalScope.launch {
            for (element in receiveChannel) {
                println("receive $element")
            }
        }
        consumer.join()
    }
    

    actor演示:

    runBlocking {
    
        // 消费者
        val sendChannel = GlobalScope.actor<Int> {
            while (true) {
                val element = receive()
                println("receive $element")
            }
        }
    
        // 生产者
        val producer = GlobalScope.launch {
            repeat(100) {
                sendChannel.send(it)
                delay(1000)
            }
        }
        producer.join()
    }
    

    (5)Channel关闭

    produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称为 热数据流。
    对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的 isClosedForSend 会立即返回true,而由于Channel缓冲区的存在,这时可能还有一些元素没有处理完,因此要等所有的元素都读取之后 isClosedForReceive 才会返回true。

    Channel的生命周期最好由主导方来维护,建议 由主导的一方实现关闭。

    runBlocking {
    
        val channel = Channel<Int>(3)
        // 生产者
        val producer = GlobalScope.launch {
            List(3) {
               channel.send(it)
               println("send $it")
            }
            channel.close()
            println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
        }
    
        // 消费者
        val consumer = GlobalScope.launch {
            for (element in channel) {
                println("receive:$element")
                delay(1000)
            }
            println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
        }
        joinAll(producer, consumer)
    }
    

    (6)BroadcastChannel

    正常情况下,一个 发送者 对应着一个 接收者。
    使用 BroadcastChannel 可以存在多个接收者。

    runBlocking {
    
        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        // 生产者
        val producer = GlobalScope.launch {
            List(3) {
                delay(1000)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
    
        List(3) { index->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (element in receiveChannel) {
                    delay(1000)
                    println("[#${index}] receive:$element")
                }
            }
        }.joinAll()
    
        producer.join()
    }
    

    BroadcastChannel<Int>(Channel.BUFFERED) 可以改成 Channel<Int>().broadcast(Channel.BUFFERED)。

    (7)await多路复用

    什么是多路复用?
    数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路,希望 一个信道同时传输多路信号,这就是所谓多路复用技术。

    复用多个await?
    两个API分别从网络和本地缓存 获取数据,期望哪个先返回就先用哪个做展示。

    request->server->response--
                             ----Select -> Response
    request->server->response--
    
    data class User(val name: String)
    data class Response<T>(val value: T, val isLocal: Boolean)
    suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
        delay(1000)
        User(name)
    }
    
    suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
        delay(100)
        User(name)
    }
    
    fun main() {
        runBlocking {
    
            GlobalScope.launch {
                val localRequest = getUserForLocal("xxx")
                val remoteRequest = getUserFromRemote("yyy")
                // select 选择执行
                val userResponse = select<Response<User>> {
                    localRequest.onAwait { Response(it, true) }
                    remoteRequest.onAwait { Response(it, false) }
                }
                println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
            }.join()
    
        }
    }
    

    select:谁先返回,就选择谁。

    (8)复用多个Channel

    fun main() {
        runBlocking {
    
            val channels = listOf(Channel<Int>(), Channel<Int>())
            GlobalScope.launch {
                delay(100)
                channels[0].send(200)
            }
            GlobalScope.launch {
                delay(50)
                channels[0].send(100)
            }
            val result = select<Int?> {
                channels.forEach { channel ->
                    channel.onReceive { it }
                }
            }
            println(result)
    
        }
    }
    

    (9)SelectClause

    我们怎么知道哪些事件可以被select呢? 其实所有能够被select的事件都是selectClauseN 类型,包括:

    selectClause0:对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。使用时,onJoin的参数是一个无参函数。
    selectClause1:对应事件有返回值,例如:onAwait和onReceive都是此类情况。
    selectClause2:对应事件有返回值,此外还需要一个额外的参数,例如Channel.onSend有两个参数,第一个是具体的数据,第二个参数是发送成功的回调参数。
    -> 如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。

    selectClause0举例:

    fun main() {
        runBlocking {
            val job1 = GlobalScope.launch {
                delay(100)
                println("job 1")
            }
            val job2 = GlobalScope.launch {
                delay(10)
                println("job 2")
            }
            select<Unit> {
                job1.onJoin { println("job1 onJoin") }
                job2.onJoin { println("job2 onJoin") }
            }
        }
    }
    

    selectClause1举例:

    data class User(val name: String)
    data class Response<T>(val value: T, val isLocal: Boolean)
    suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
        delay(1000)
        User(name)
    }
    
    suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
        delay(100)
        User(name)
    }
    
    fun main() {
        runBlocking {
    
            GlobalScope.launch {
                val localRequest = getUserForLocal("xxx")
                val remoteRequest = getUserFromRemote("yyy")
                // select 选择执行
                val userResponse = select<Response<User>> {
                    localRequest.onAwait { Response(it, true) }
                    remoteRequest.onAwait { Response(it, false) }
                }
                println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
            }.join()
    
        }
    }
    
    fun main() {
        runBlocking {
    
            val channels = listOf(Channel<Int>(), Channel<Int>())
            GlobalScope.launch {
                delay(100)
                channels[0].send(200)
            }
            GlobalScope.launch {
                delay(50)
                channels[0].send(100)
            }
            val result = select<Int?> {
                channels.forEach { channel ->
                    channel.onReceive { it }
                }
            }
            println(result)
    
        }
    }
    

    selectClause2举例:

    fun main() {
        runBlocking {
            val channels = listOf(Channel<Int>(), Channel<Int>())
            println(channels)
            launch(Dispatchers.IO) {
                select<Unit> {
                    launch {
                        delay(10)
                        channels[1].onSend(200) { sentChannel->
                            println("sent on $sentChannel")
                        }
                    }
                    launch {
                        delay(100)
                        channels[0].onSend(100) { sentChannel->
                            println("sent on $sentChannel")
                        }
                    }
                }
            }
            GlobalScope.launch {
                println(channels[0].receive())
            }
            GlobalScope.launch {
                println(channels[1].receive())
            }
        }
    }
    

    作者:NoBugException
    链接:https://www.jianshu.com/p/958d38e76cc6
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

          本文标题:kotlin Channel通道

          本文链接:https://www.haomeiwen.com/subject/oikzpdtx.html