深究Kotlin协程delay函数源码实现

作者: 飘飘然的影子 | 来源:发表于2023-07-20 11:32 被阅读0次

    前言

    在开发项目期间 Kotlin 协程是经常使用的异步&并发编程框架。在协程使用过程中,时常会用到挂起函数,而 delay 就是一个挂起函数,在很多业务场景中会使用到,本文通过源码分析了解其背后的实现原理

    分析

    举个🌰
    通常我们的业务场景有这种情况,需要延时执行某些任务

    private suspend fun test() {
        findViewLifecycleOwner()?.lifecycleScope?.launch() {
            print("before delay")
            //延时3s
            delay(3000)
            print("after delay")
        }
    }
    

    delay 函数则会挂起当前协程,并且会在3s后进行恢复
    再来看看 delay 函数的实现

    /**
     * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
     *
     */
    public suspend fun delay(timeMillis: Long) {
        if (timeMillis <= 0) return // don't delay
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
            if (timeMillis < Long.MAX_VALUE) {
                cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
            }
        }
    }
    

    suspendCancellableCoroutine

    我们先来看看 suspendCancellableCoroutine 函数,它是 Kotlin 协程库中的一个函数,它是一个挂起函数,用于创建一个可以被取消的挂起点。
    这个函数也是协程中经常使用,它可以将异步的回调用同步的方式表达出来,减少回调嵌套

    public suspend inline fun <T> suspendCancellableCoroutine(
        crossinline block: (CancellableContinuation<T>) -> Unit
    ): T =
        suspendCoroutineUninterceptedOrReturn { uCont ->
            val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
            /*
             * For non-atomic cancellation we setup parent-child relationship immediately
             * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
             * properly supports cancellation.
             */
            cancellable.initCancellability()
            block(cancellable)
            cancellable.getResult()
        }
    

    然而 suspendCancellableCoroutine 函数内部是使用了suspendCoroutineUninterceptedOrReturn 函数实现的,而且也是一个挂起函数

    public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
        contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
        throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
    }
    

    suspendCoroutineUninterceptedOrReturn 函数没有实际内容,是因为它是编译器内建函数,它是由 Kotlin 编译器来实现的。主要作用是传递协程上下文,以及判断是否挂起的,或者直接返回结果
    再举个🌰
    写个👇方法,然后进行反编译看看

    private suspend fun test2(){
        suspendCancellableCoroutine<String> {
            print("suspendCancellableCoroutine test")
        }
    }
    

    反编译后

    // 传递了 Continuation 上下文 
    private final Object test2(Continuation $completion) {
       int $i$f$suspendCancellableCoroutine = false;
       int var4 = false;
       //封装实体
       CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
       cancellable$iv.initCancellability();
       CancellableContinuation it = (CancellableContinuation)cancellable$iv;
       int var7 = false;
       //执行block代码
       String var8 = "suspendCancellableCoroutine test";
       System.out.print(var8);
       //获取协程体返回值
       Object var10000 = cancellable$iv.getResult();
       if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
          DebugProbesKt.probeCoroutineSuspended($completion);
       }
       //判断是否是挂起,是的话返回挂起状态
       return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
    }
    

    正如上面所说的,suspendCoroutineUninterceptedOrReturn 函数是新增了传递协程上下文,以及判断是否挂起的,或者直接返回结果的逻辑
    知道了 suspendCancellableCoroutine 函数的作用后,再回过头看 delay 函数

    /**
     * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
     *
     */
    public suspend fun delay(timeMillis: Long) {
        if (timeMillis <= 0) return // don't delay
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
            if (timeMillis < Long.MAX_VALUE) {
                cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
            }
        }
    }
    

    suspendCancellableCoroutine 函数传递了封装后的协程对象 cont,以及判断 cont 是否会执行挂起

    DefaultExecutor

    看看 cont.context.delay是啥

    //获取delay
    internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
    
    //默认实现
    internal actual val DefaultDelay: Delay = DefaultExecutor
    
    @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
    internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
    
     init {
        incrementUseCount() // this event loop is never completed
     }
        ... 省略很多代码 ...
    }
    

    可以看见 cont.context.delay 最终的实现是 DefaultExecutor ,它继承了EventLoopImplBase 和 Runnable
    DefaultExecutor 是个单例,里边开启了线程,并且检测队列里任务的情况来决定是否需要挂起线程进行等待
    而 scheduleResumeAfterDelay 函数是 EventLoopImplBase 里实现的

    public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timeNanos = delayToNanos(timeMillis)
        if (timeNanos < MAX_DELAY_NS) {
            val now = nanoTime()
            DelayedResumeTask(now + timeNanos, continuation).also { task ->
                continuation.disposeOnCancellation(task)
                schedule(now, task)
            }
        }
    }
    

    如果满足时间条件,则创建一个延迟的task,DelayedResumeTask

    入队列

    看 schedule 函数

    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            //unpark() 会启动线程
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
            SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
            SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
            else -> error("unexpected result")
        }
    }
    
    private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
        if (isCompleted) return SCHEDULE_COMPLETED
        val delayedQueue = _delayed.value ?: run {
            _delayed.compareAndSet(null, DelayedTaskQueue(now))
            _delayed.value!!
        }
        //入队列
        return delayedTask.scheduleTask(now, delayedQueue, this)
    }
    

    实际就是把 DelayedResumeTask 放进 _delayed 队列里面,并且启动 DefaultExecutor 里的线程(如果没有开启的话)
    _delayed 是存储延迟执行 task 的队列
    还有一个 _queue 是是存储正常任务的队列

    internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
        // null | CLOSED_EMPTY | task | Queue<Runnable>
        private val _queue = atomic<Any?>(null)
    
        // Allocated only only once
        private val _delayed = atomic<DelayedTaskQueue?>(null)
        ...
        }
    

    既然上面有入队列,就有出队列

    出队列

    回到刚刚说的启动了 DefaultExecutor 里的线程,看看它的 run 方法

    override fun run() {
        ThreadLocalEventLoop.setEventLoop(this)
        registerTimeLoopThread()
        try {
            var shutdownNanos = Long.MAX_VALUE
            if (!notifyStartup()) return
            while (true) {
                Thread.interrupted() // just reset interruption flag
                //取队列核心代码
                var parkNanos = processNextEvent()
                if (parkNanos == Long.MAX_VALUE) {
                    // nothing to do, initialize shutdown timeout
                    val now = nanoTime()
                    if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
                    val tillShutdown = shutdownNanos - now
                    if (tillShutdown <= 0) return // shut thread down
                    parkNanos = parkNanos.coerceAtMost(tillShutdown)
                } else
                    shutdownNanos = Long.MAX_VALUE
                    //如果返回的时间大于0
                if (parkNanos > 0) {
                    // check if shutdown was requested and bail out in this case
                    if (isShutdownRequested) return
                    //挂起线程一段时间
                    parkNanos(this, parkNanos)
                }
            }
        } finally {
            _thread = null // this thread is dead
            acknowledgeShutdownIfNeeded()
            unregisterTimeLoopThread()
            // recheck if queues are empty after _thread reference was set to null (!!!)
            if (!isEmpty) thread // recreate thread if it is needed
        }
    }
    

    取出队列里的 task 逻辑是在 processNextEvent() 函数中,看看它的实现

    override fun processNextEvent(): Long {
        // unconfined events take priority
        if (processUnconfinedEvent()) return 0
        // queue all delayed tasks that are due to be executed
        val delayed = _delayed.value
        if (delayed != null && !delayed.isEmpty) {
            val now = nanoTime()
            while (true) {
                // make sure that moving from delayed to queue removes from delayed only after it is added to queue
                // to make sure that 'isEmpty' and `nextTime` that check both of them
                // do not transiently report that both delayed and queue are empty during move
                delayed.removeFirstIf {
                    if (it.timeToExecute(now)) {
                    // 加入正常任务队列
                        enqueueImpl(it)
                    } else
                        false
                } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
            }
        }
        // then process one event from queue
        val task = dequeue()
        if (task != null) {
            task.run()
            return 0
        }
        //返回线程需要挂起的时间
        return nextTime
    }
    
    protected override val nextTime: Long
        get() {
            if (super.nextTime == 0L) return 0L
            val queue = _queue.value
            when {
                queue === null -> {} // empty queue -- proceed
                queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
                queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
                else -> return 0 // non-empty queue
            }
            val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
            return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
    

    其中流程是:

    1. 有个死循环一直在从 _delayed 队列里取延迟 task,如果判断延迟时间已经到了才会加入正常任务队列里且移除
    2. 直到取不出延迟 task 了才跳出循环
    3. 然后从正常队列里取出任务进行执行
    4. 执行任务就是在执行 DelayedResumeTask 类里的 run 方法
       private inner class DelayedResumeTask(
        nanoTime: Long,
        private val cont: CancellableContinuation<Unit>
    ) : DelayedTask(nanoTime) {
        override fun run() { with(cont) { resumeUndispatched(Unit) } }
        override fun toString(): String = super.toString() + cont.toString()
    }
    
    1. 可以是看见调用了 resumeUndispatched() 函数,使用协程能经常看见或者使用 resumexxx 函数,它就是协程中的恢复,对应于挂起,它们理应是成对出现的
    2. resumeUndispatched() 函数最终的实现就是一开始封装 Continuation 协程为 CancellableContinuationImpl 的实现,即是 cont 对象,最终恢复了协程的运行
    //恢复协程
    override fun CoroutineDispatcher.resumeUndispatched(value: T) {
        val dc = delegate as? DispatchedContinuation
        resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
    }
    

    processNextEvent() 函数的返回值

    • 最后返回下一个 nextDelayedTask 的延迟时间,即是线程挂起时间
    • 如果返回的时间大于0,则最终会调用 LockSupport.parkNanos(),将线程挂起一段时间,直到延迟时间结束。
      源码看到这里就能知道 delay 函数的背后运行原理了,最终是由 DefaultExecutor 线程整体控制挂起和恢复,而且不会阻塞当前调用方的线程。

    总结

    delay 函数的总体流程是

    1. 创建一个延迟任务(DelayedResumeTask),它包含需要延迟执行的逻辑和执行时间信息。
    2. 将延迟任务添加到延迟队列(_delayed)中。
    3. 在等待延迟任务执行之前,协程会被暂停(挂起)。
    4. 有一个单独的 DefaultExecutor 线程定期检查延迟队列中的任务,如果任务的延迟时间到了,就会将任务从延迟队列中移除,并将其放入执行队列中。
    5. 执行队列中取出任务,并调用其执行逻辑。
    6. 执行任务的过程也就是协程的恢复过程,一旦任务开始执行,协程会被恢复,继续执行其后续的逻辑。

    相关文章

      网友评论

        本文标题:深究Kotlin协程delay函数源码实现

        本文链接:https://www.haomeiwen.com/subject/wvicydtx.html