Kotlin协程源码分析(二)之Channel

作者: LSteven | 来源:发表于2019-02-19 21:24 被阅读13次

    这章理一下channel,先分享一句学习时候看到的话:Do not communicate by sharing memory; instead, share memory by communicating.。本来好像是用在go上的,但也有着异曲同工之妙啊

    channel顾名思义是管道,有入口与出口。因此最底层有sendChannel&receiveChannel

    produce

    Produce = Coroutine + Channel

    example:

    val channel: ReceiveChannel<Int> = produce<Int>(CommonPool) {
        for (i in 0 .. 100) {
            delay(1000)
            channel.send(i)
        }
    }
    
    launch(UI) {
        for (number in channel) {
            textView.text = "Latest number is $number"
        }
    }
    

    produce也是产生协程,跟普通的launch不同他会返回一个receiveChannel,后面会看到receiveChannel是一个迭代器,同时会suspendhasNext和next()上,因此另一个协程就可以使用for...in...等待接受。

    @ExperimentalCoroutinesApi
    public fun <E> CoroutineScope.produce(
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = 0,
        @BuilderInference block: suspend ProducerScope<E>.() -> Unit
    ): ReceiveChannel<E> {
        val channel = Channel<E>(capacity)
        val newContext = newCoroutineContext(context)
        val coroutine = ProducerCoroutine(newContext, channel)
        coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
        return coroutine
    }
    

    同时,produce发射完成后是会自己关闭的,省的我们自己关闭信道:

    override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
        val cause = (state as? CompletedExceptionally)?.cause
        val processed = _channel.close(cause)
        if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
    }
    

    通过jobinvokeOnCompletion实现。

    actor

    example

    val channel: SendChannel<View> = actor(UI) {
        for (number in channel) {
            textView.text = "A new click happend!"
        }
    }
    
    button.setOnClickListener {
        launch(CommonPool) {
            channel.send(it)
        }
    }
    

    produce相反返回sendChannel

    高级用法

    fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
        // launch one actor to handle all events on this node
        val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main) {
            for (event in channel) action(event) // pass event to action
        }
        // install a listener to offer events to this actor
        onMouseClicked = EventHandler { event ->
            eventActor.offer(event)
        }
    }
    

    我们看这里用了offer而不是send,我们可以把for..in..先简单的写成以下形式:

    while(iterator.hasNext()){ //suspend fuction
        val event = iterator.next() //suspend function
        action(event)
    }
    
    
    private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine sc@ { cont ->
        val receive = ReceiveHasNext(this, cont)
        while (true) {
            if (channel.enqueueReceive(receive)) {
                channel.removeReceiveOnCancel(cont, receive)
                return@sc
            }
            // hm... something is not right. try to poll
            val result = channel.pollInternal()
            this.result = result
            if (result is Closed<*>) {
                if (result.closeCause == null)
                    cont.resume(false)
                else
                    cont.resumeWithException(result.receiveException)
                return@sc
            }
            if (result !== POLL_FAILED) {
                cont.resume(true)
                return@sc
            }
        }
    }        
    

    假设队列里没有东西时,enqueue一个receiveHasNext进行等待。过会解释一下channel的原理。现在只要知道,当有sender.send时,与receive关联的cont就会被调用resume,那么显而易见,当action正在处理时队列中没有receiver,而offer是不会suspend的,因此事件就被抛弃。

    conflation事件合并
    fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
        // launch one actor to handle all events on this node
        val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here
            for (event in channel) action(event) // pass event to action
        }
        // install a listener to offer events to this actor
        onMouseClicked = EventHandler { event ->
            eventActor.offer(event)
        }
    }
    

    这里我们使用CONFALTED,即合并所有事件,因此接受者永远处理最近一个。原理如下:

    result === OFFER_FAILED -> { // try to buffer
        val sendResult = sendConflated(element)
        when (sendResult) {
            null -> return OFFER_SUCCESS
            is Closed<*> -> {
                conflatePreviousSendBuffered(sendResult)
                return sendResult
            }
        }
        // otherwise there was receiver in queue, retry super.offerInternal
    }
    

    offer失败时需要suspend等待,(说明还没有接受者或者人家正忙着),插入sendBuffered,同时移除前面已有的sendBuffered

     var prev = node.prevNode
        while (prev is SendBuffered<*>) {
            if (!prev.remove()) {
                prev.helpRemove()
            }
            prev = prev.prevNode
        }
    

    这样永远是最近一个生效。

    大概channel原理

    其实看abstractChannel会先看到一个queue,这时候显而易见会把它当做是像linkedlist那种塞数据的地方。但其实queue是用来放receive/send node。当队列为空时,send时会先从队列取第一个receiveNode,取不到就suspend,把自己当成sendNode放入;不然就把数据直接交给receiveNode

    具体channel实现时,例如ArrayChannel(buffer),会多加一个buffer队列,当队列为空时,send时会先从队列取第一个receiveNode,取不到就放入buffer队列,如果buffer队列满了,把自己当成sendNode放入就suspend;同时把不然就把数据直接交给receiveNode

    select

    参考

    suspend fun selectInsult(john: ReceiveChannel<String>, mike: ReceiveChannel<String>) {
        select<Unit> { //  <Unit> means that this select expression does not produce any result
            john.onReceive { value ->  // this is the first select clause
                println("John says '$value'")
            }
            mike.onReceive { value ->  // this is the second select clause
                println("Mike says '$value'")
            }
        }
    }
    

    select可以等任何一个回来,也可以等await:

    fun adult(): Deferred<String> = async(CommonPool) {
        // the adult stops the exchange after a while
        delay(Random().nextInt(2000).toLong())
        "Stop it!"
    }
    
    suspend fun selectInsult(john: ReceiveChannel<String>, mike: ReceiveChannel<String>,
                             adult: Deferred<String>) {
        select {
            // [..] the rest is like before
            adult.onAwait { value ->
                println("Exasperated adult says '$value'")
            }
        }
    }
    

    linux里的select其实类似,(能知道是哪个吗?):

    final override val onReceive: SelectClause1<E>
        get() = object : SelectClause1<E> {
            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
                registerSelectReceive(select, block)
            }
        }
        
        
    private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
        while (true) {
            if (select.isSelected) return
            if (isEmpty) {
                val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
                when {
                    enqueueResult === ALREADY_SELECTED -> return
                    enqueueResult === ENQUEUE_FAILED -> {} // retry
                    else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
                }
            } else {
                val pollResult = pollSelectInternal(select)
                when {
                    pollResult === ALREADY_SELECTED -> return
                    pollResult === POLL_FAILED -> {} // retry
                    pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
                    else -> {
                        block.startCoroutineUnintercepted(pollResult as E, select.completion)
                        return
                    }
                }
            }
        }
    }
    

    能看到onReceive是实现SelectCaluse1,同时在selectBuilderImpl环境下:

    override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
        registerSelectClause1(this@SelectBuilderImpl, block)
    }
    

    所以会往queueenqueue两个receive节点。

    同时能看到如果任何一次select节点获取数据以后:

     when {
        pollResult === ALREADY_SELECTED -> return
        pollResult === POLL_FAILED -> {} // retry
        pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException)
        else -> {
            block.startCoroutineUnintercepted(pollResult as E, select.completion)
            return
        }
    }
    

    会调用block.startCoroutineUnintercepted:

    /**
     * Use this function to restart coroutine directly from inside of [suspendCoroutine],
     * when the code is already in the context of this coroutine.
     * It does not use [ContinuationInterceptor] and does not update context of the current thread.
     */
    internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) {
        startDirect(completion) {  actualCompletion ->
            startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
        }
    }
    

    之前讲过startCoroutineUnintercepted其实就是function.invoke(),所以就调用block.invoke(select的completion是自己),获得值后通过uCont.resume即可。

    onAwait

    这个和deferedjob(Support)搞在一起:

    private class SelectAwaitOnCompletion<T, R>(
        job: JobSupport,
        private val select: SelectInstance<R>,
        private val block: suspend (T) -> R
    ) : JobNode<JobSupport>(job) {
        override fun invoke(cause: Throwable?) {
            if (select.trySelect(null))
                job.selectAwaitCompletion(select, block)
        }
        override fun toString(): String = "SelectAwaitOnCompletion[$select]"
    }
    

    可以看到当任务成功后,select会被继续进行

    broadcast

    首先解决一个问题,一个sender多个receiver是怎么处理的。

    val channel = Channel<Int>()
    launch {
        val value1 = channel.receive()
    }
    launch {
        val value2 = channel.receive()
    }
    launch {
        channel.send(1)
    }
    

    因为是1vs1消费。只有第一个会收到,因为它插在等待队列的第一个。用broadcast可以保证大家都收到。它维护一个subscribeuser list,所有消费者都能收到channel.sendelement

    operation

    map

    public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
        GlobalScope.produce(context, onCompletion = consumes()) {
            consumeEach {
                send(transform(it))
            }
        }
    
    

    可以实现跟RX一样的操作符,接受者收到后经过转换再进行发送返回最终新的receiveChannel

    hot or cold

    channelhot的。

    When the data is produced by the Observable itself, we call it a cold Observable. When the data is produced outside the Observable, we call it a hot Observable.

    Provide abstraction for cold streams

    ... 这个todo,后续再说。

    参考

    Even smarter async with coroutine actors

    官方文档

    相关文章

      网友评论

        本文标题:Kotlin协程源码分析(二)之Channel

        本文链接:https://www.haomeiwen.com/subject/zckqyqtx.html