美文网首页
继续来,同我一起撸Kotlin Channel 深水区

继续来,同我一起撸Kotlin Channel 深水区

作者: 进击的老六 | 来源:发表于2023-10-22 14:03 被阅读0次

    前言

    本篇文章将着重分析协程间的通信方式。通过本篇文章,你将了解到:

    1. Channel的引入及简单使用
    2. Channel的原理
    3. Channel四种类型深入解析
    4. produce/actor的使用与原理

    1. Channel的引入及简单使用

    初级版协程间通信

    先来看一个简单的通信Demo:

        fun testChannel() {
            //协程1
            var deferred= GlobalScope.async {
                //假装在加工数据
                Thread.sleep(2000)
                "Hello fishforest"
            }
            //协程2
            GlobalScope.launch {
                var result = deferred.await()
                println("get result from coroutine1: $result")
            }
        }
    

    如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。
    现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:

    共享一个变量,这个变量可以是个队列。

    于是Demo改造如下:

        fun testChannel2() {
            //阻塞队列
            var queue = ArrayBlockingQueue<String>(5)
            //协程1
            GlobalScope.launch {
                var count = 0
                while (true) {
                    //假装在加工数据
                    Thread.sleep(1000)
                    queue.put("fish ${count++}")
                }
            }
    
            //协程2
            GlobalScope.launch {
                while (true) {
                    Thread.sleep(1000)
                    println("get result from coroutine1:${queue.take()}")
                }
            }
        }
    

    通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。
    看似美好,实际上此处有个很大的漏洞:

    队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。

    我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。

    升级版协程间通信-Channel

    同样的需求,我们用Channel 实现:

        fun testChannel3() {
            //定义Channel
            var channel = Channel<String>()
            //协程1
            GlobalScope.launch {
                var count = 0
                while (true) {
                    //假装在加工数据
                    Thread.sleep(1000)
                    var sendStr = "fish ${count++}"
                    println("send $sendStr")
                    channel.send("$sendStr")
                }
            }
    
            //协程2
            GlobalScope.launch {
                while (true) {
                    Thread.sleep(1000)
                    println("receive:${channel.receive()}")
                }
            }
        }
    

    与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。
    查看打印结果:

    你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列?
    要想解开这个谜题,最好的方法是从源码入手深究其原理。

    2. Channel的原理

    Channel的构造

    先从Channel 构造开始:

    #Channel.kt
    public fun <E> Channel(
        //Channel 容量/叫做Channel类型更合理一些
        capacity: Int = Channel.RENDEZVOUS,
        //缓冲区满后,发送端的处理方式
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
        //信息没有传递出去时的回调
        onUndeliveredElement: ((E) -> Unit)? = null
    ): Channel<E> =
        when (capacity) {
            //默认是约会模式
            Channel.RENDEZVOUS -> {
                //默认挂起
                if (onBufferOverflow == BufferOverflow.SUSPEND)
                    RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
                else
                    ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
            }
            //...
        }
    

    此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。
    RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。

    重点在AbstractChannel/AbstractSendChannel及其子类里。

    Channel的队列结构

    AbstractSendChannel 里有个很重要的成员变量:

        protected val queue = LockFreeLinkedListHead()
    

    LockFreeLinkedListHead 继承自LockFreeLinkedListNode,而这个Node 我们在分析Kotlin 协程之取消与异常处理探索之旅(上) 有提及过,此处再拎出来说说。
    先看其定义:

    #LockFreeLinkedList.kt
    public actual open class LockFreeLinkedListNode {
        //后驱指针
        private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
        //前驱指针
        private val _prev = atomic(this) 
        //...
    }
    

    很典型的一个链表结构,并且是无锁链表,意思是它的插入/删除是无需上锁的,核心是使用了CAS。 回到Channel里的成员变量queue,初始的链表结构如下:

    可以看出,当前节点的_next、_prev分别指向自己。

    当往链表里面添加Node时,形成如下结构:

    链表头为固定节点,通过它构造了双向循环链表。
    AbstractSendChannel 里的queue 就是个链表头,通过它我们可以快速找到链表里的第1个节点(_next 指向的节点),也可以快速找到链表的最后一个节点(_prev指向的节点)。
    于是形成了一个队列结构,每次往队列里放入数据,就放到链表的尾部,每次从队列里取数据,就从链表头后的第一个节点取。

    Channel的send/receive

    send 分析

    #AbstractChannel.kt
        public final override suspend fun send(element: E) {
            //快速判断是否可以放入queue 队列
            //若能成功,则直接返回
            if (offerInternal(element) === kotlinx.coroutines.channels.OFFER_SUCCESS) return
            //不能退出,则挂起协程
            return sendSuspend(element)
        }
    
        protected open fun offerInternal(element: E): Any {
            while (true) {
                //先找到队列第一个Node节点,如果存在并且是Receive 类型,说明有接收者在等待
                val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
                //给接收者协程赋值
                val token = receive.tryResumeReceive(element, null)
                if (token != null) {
                    kotlinx.coroutines.assert { token === RESUME_TOKEN }
                    //重新恢复接收者协程
                    receive.completeResumeReceive(element)
                    //返回插入的结果
                    return receive.offerResult
                }
            }
        }
    
        private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
            //suspendCancellableCoroutineReusable 里有挂起协程的逻辑
            loop@ while (true) {
                if (isFullImpl) {
                    //构造SendElement,它是Node类型 
                    val send = if (onUndeliveredElement == null)
                        //SendElement 有两个成员变量:1是具体的值,2是当前协程的封装体cont
                        SendElement(element, cont) else
                        SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
                    //将Element 加入到队列尾部
                    val enqueueResult = enqueueSend(send)
                    //插入成功,则返回
                }
            }
        }
    

    用图表示以上流程:

    接收者协程被恢复后,重新调度执行协程,而传入的值即为send发送的值,最终recevie返回的即是send过来的值。
    对协程的挂起有疑惑请移步:讲真,Kotlin 协程的挂起没那么神秘(原理篇)

    receive 分析
    与send流程类似,就不贴代码了,仅用图表示:

    可以看出,send/receive 通过判断queue的状态来决定是否挂起当前协程,而queue里的Node 又分为三种类型:

    [图片上传失败...(image-51eac8-1698040950561)]

    综合以上得出:

    在RENDEZVOUS类型(默认类型)下,发送者协程需要等待接收者就位了(到队列里等待)才会继续往下走。同样的,接收者协程需要等待发送者就位了(到队列里等待)才会继续往下走。因此,形成的现象是发送者/接收者成对出现。

    成对出现的场景,我们称RENDEZVOUS 为约会类型。

    3. Channel四种类型深入解析

    CONFLATED 类型

    前面的分析是基于约会类型,实际上Channel还有其它类型,通过其构造过程可看出:

    #AbstractChannel.kt
    public fun <E> Channel(
        capacity: Int = Channel.RENDEZVOUS,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
        onUndeliveredElement: ((E) -> Unit)? = null
    ): Channel<E> =
        when (capacity) {
            //约会类型
            Channel.RENDEZVOUS -> {
                if (onBufferOverflow == BufferOverflow.SUSPEND)
                    RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
                else
                    //转为缓冲类型
                    ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
            }
            //混合类型
            Channel.CONFLATED -> {
                //此种类型下必须是挂起模式
                require(onBufferOverflow == BufferOverflow.SUSPEND) {
                    "CONFLATED capacity cannot be used with non-default onBufferOverflow"
                }
                ConflatedChannel(onUndeliveredElement)
            }
            //无限制类型
            Channel.UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
            //缓冲类型
            Channel.BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
                if (onBufferOverflow == BufferOverflow.SUSPEND) Channel.CHANNEL_DEFAULT_CAPACITY else 1,
                onBufferOverflow, onUndeliveredElement
            )
            //没有指定具体类型是以上4种内的组合
        }
    

    先看CONFLATED(混合)类型。
    ConflatedChannel 继承自AbstractChannel,有个成员变量:value。
    重点来看其重写的函数:offerInternal与pollInternal,分别对应send与receive的逻辑。

    send 分析

    #ConflatedChannel.kt
    protected override fun offerInternal(element: E): Any {
        var receive: ReceiveOrClosed<E>? = null
        //先上锁
        lock.withLock {
            //如果value 为空,也就是之前没有发送过,说明可能有接收者在等待。
            if (value === kotlinx.coroutines.channels.EMPTY) {
                loop@ while(true) {
                    //尝试取出接收者
                    receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
                    if (receive is Closed) {
                        //是关闭Node,直接返回
                        return receive!!
                    }
                    //赋值给接收者协程
                    val token = receive!!.tryResumeReceive(element, null)
                    if (token != null) {
                        //跳出锁
                        return@withLock
                    }
                }
            }
            //更新发送值到value里
            updateValueLocked(element)?.let { throw it }
            //成功插入
            return OFFER_SUCCESS
        }
        //如果找到接收者,则恢复接收者协程
        receive!!.completeResumeReceive(element)
        return receive!!.offerResult
    }
    

    receive 分析

    #ConflatedChannel.kt
    protected override fun pollInternal(): Any? {
        var result: Any? = null
        //上锁
        lock.withLock {
            //如果value 为空,说明没数据,取数据失败
            if (value === kotlinx.coroutines.channels.EMPTY) return closedForSend ?: POLL_FAILED
            //从value 里取数据
            result = value
            //恢复到无数据状态
            value = EMPTY
        }
        return result
    }
    

    由此可见:

    在 CONFLATED类型下,发送者无需等待接收者就位,它可以一直更新数据。

    BUFFERED 类型

    此为缓冲类型,与其它类型最大的不同之处在于它内部有数据缓冲区。
    ArrayChannel 继承自AbstractChannel,其成员变量:

    //数据缓冲区
        private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }
    

    重点函数还是offerInternal与pollInternal。
    send 分析

    #ArrayChannel.kt
    protected override fun offerInternal(element: E): Any {
        var receive: ReceiveOrClosed<E>? = null
        lock.withLock {
            //size 为buffer 当前的实际存储数据的个数
            val size = this.size.value
            //更新size,此处根据发送策略,有可能会直接退出
            updateBufferSize(size)?.let { return it }
            if (size == 0) {
                //当前缓冲区没有数据
                loop@ while (true) {
                    //查看是否有接收者等待
                    receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
                    //给接收者协程赋值
                    val token = receive!!.tryResumeReceive(element, null)
                    if (token != null) {
                        //缓冲区数量不变
                        this.size.value = size
                        return@withLock
                    }
                }
            }
            //加入到缓冲队列
            enqueueElement(size, element)
            //插入成功
            return OFFER_SUCCESS
        }
        //恢复接收者协程
        receive!!.completeResumeReceive(element)
        return receive!!.offerResult
    }
    
    private fun updateBufferSize(currentSize: Int): Symbol? {
        if (currentSize < capacity) {
            //还可以继续存放数据
            size.value = currentSize + 1 // tentatively put it into the buffer
            return null // proceed
        }
        //缓冲区满
        return when (onBufferOverflow) {
            //协程需要挂起
            BufferOverflow.SUSPEND -> OFFER_FAILED
            //舍弃最新数据,相当于发送永远是成功的
            BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
            //舍弃旧的数据,发送继续走下面的流程
            BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
        }
    }
    

    receive 分析
    receive 过程与send类似,就不贴源码了,直接上图对比:

    可以看出,对于发送者来说:

    先将数据放入数据缓冲队列,当缓冲区满后才会考虑是否需要挂起发送者协程。同样的,对于接收者来说,先从缓冲队列取数据,当缓冲区没数据时才会挂起自身。

    UNLIMITED 类型

    此类型为无限制类型,网上一些文章将此与BUFFERED类型类比,并归为“无限缓冲类型”,该说法是否正确,接下来一步步印证。
    同样的,LinkedListChannel继承自AbstractChannel。
    重点函数还是offerInternal与pollInternal。
    send 分析

    #LinkedListChannel.kt
    protected override fun offerInternal(element: E): Any {
        while (true) {
            //快速查找是否有接收者等待
            val result = super.offerInternal(element)
            when {
                //找到接收者,插入算是成功
                result === OFFER_SUCCESS -> return OFFER_SUCCESS
                //没找到
                result === OFFER_FAILED -> {
                    //加入到协程缓冲队列
                    when (val sendResult = sendBuffered(element)) {
                        null -> return OFFER_SUCCESS
                        is Closed<*> -> return sendResult
                    }
                }
            }
        }
    }
    
    protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
        //将SendBuffered 加入到queue里(队尾)
        queue.addLastIfPrev(AbstractSendChannel.SendBuffered(element)) { prev ->
            if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
            true
        }
        //添加成功
        return null
    }
    

    receive 分析 receive 过程完全依靠父类AbstractChannel完成,此处就不再赘述,用图表示:

    可以看出:

    此类型下,发送者不会挂起,会一直往队列里存放数据,理论上是可以无限制存放的。与BUFFERED类型不同的是,UNLIMITED 缓冲数据使用的是queue,它是链表。而BUFFERED 缓冲数据使用的是数组。

    Channel 四种类型比对

    对于接收者来说,只有一种逻辑:

    有数据则消费数据,没数据则挂起等待。

    4. produce/actor的使用与原理

    使用

    通过上面的分析,我们知道接收者有可能会阻塞,怎样才能让接收者知道数据已经发送完毕了呢?
    答案是:Channel.close()。
    当调用该函数时,会往queue里加入Closed节点,当send/receive 取出该节点时就知道Channel关闭了。
    你说,能不能不手动调用该函数呢?刚好,produce可以解决该问题:

        fun testProduce() {
            //返回接收者
            var receiveChannel = GlobalScope.produce<String> {
                //
                for (x in 1..5) {
                    var sendStr = "fish $x"
                    println("send $sendStr")
                    send("$sendStr")
                }
            }
            //接收数据
            GlobalScope.launch {
                while (true) {
                    println("job2 receive:${receiveChannel.receive()}")
                }
                println("job2 end")
            }
            GlobalScope.launch {
                while (true) {
                    println("job3 receive:${receiveChannel.receive()}")
                }
                println("job3 end")
            }
        }
    

    produce 函数返回Channel,在produce的协程体里可以发送数据,而通过返回的Channel,其它协程可以接收数据。当produce协程执行完毕后,将会主动调用close关闭Channel,其它Receive的Channel就会有感知,从而退出挂起状态。
    这是一个典型的单生产者--多消费者的模型。
    反之单消费者--多生产者的模型如下:

        fun testActor() {
            //返回发送者
            var sendChannel = GlobalScope.actor<String> {
                //
                for (x in 1..5) {
                    println("job1 receive:${receive()}")
                }
                println("actor end")
            }
            //发送者
            GlobalScope.launch {
                sendChannel.send("send from job2")
            }
            GlobalScope.launch {
                sendChannel.send("send from job3")
            }
        }
    

    原理

    produce和actor内部创建了RENDEZVOUS 类型的Channel,它们返回的Channel以及协程体里的Channel都是委托这个内部的Channel来完成功能的,并且Channel绑定了协程的生命周期,当协程取消时将会取消Channel。(由于篇幅原因,就不展开细说了,有兴趣可以自行阅读源码或是留言)。

    更多Kotlin可以查看我的个人介绍!!!

    相关文章

      网友评论

          本文标题:继续来,同我一起撸Kotlin Channel 深水区

          本文链接:https://www.haomeiwen.com/subject/zioridtx.html