美文网首页Android开发Android开发经验谈
【kotlin】- delay函数实现原理

【kotlin】- delay函数实现原理

作者: 拔萝卜占坑 | 来源:发表于2021-09-11 19:00 被阅读0次

    简介

    这片文章主要讲解kotlindelay函数的实现原理,delay是一个挂起函数。kotlin携程使用过程中,经常使用到挂起函数,在我学习kotlin携程的时候,一些现象让我很是困惑,所以打算从源码角度来逐一分析。

    说明

    在分析delay源码实现过程中,由于对kotlin有些语法还不是很熟悉,所以并不会把每一步将得很透彻,只会梳理一个大致的流程,如果讲解有误的地方,欢迎指出。

    例子先行

    fun main() = runBlocking {
        println("${treadName()}======start")
        launch {
            println("${treadName()}======delay 1s  start")
            delay(1000)
            println("${treadName()}======delay 1s end")
        }
    
        println("${treadName()}======delay 3s start")
        delay(3000)
        println("${treadName()}======delay 3s end")
        // 延迟,保活进程
        Thread.sleep(500000)
    }
    

    输出如下:

    main======start
    main======delay 3s start
    main======delay 1s  start
    main======delay 1s end
    main======delay 3s end
    

    根据日志可以看出:

    1. 日志输出环境是在主线程。
    2. 执行3s延迟函数后,切换到了launch携程体执行。
    3. delay挂起函数恢复后执行各自的打印函数。

    \color{blue}{疑问}
    如果真像打印日志输出一样,所以的操作都是在一个线程(主线程)完成,那么问题来了。第一:按照Java线程知识,单线程执行是按照顺序的,是单条线的。那么不管delay里是何等骚操作,只要没有重新起线程,应该不能够像上面输入的那样吧,你说sleepwait,如果你这么想,那么你可以去补一补Java多线程基础知识了。猜想1. 难得真有什么我不知道的骚操作可以在一个线程里面同时执行delay和其它代码,真像很多人说的,携程性能很好,使用挂起函数可以不用启动新的线程,就可以异步执行,那真的就很不错。2. delay启动了新的线程,上面的现象只不过是进行了线程切换,那么如果多次调用 delay那么岂不是要创建很多线程,这性能问题和资源问题怎么解决。3. delay基于某种任务调度策略。

    delay源码

    public suspend inline fun <T> suspendCancellableCoroutine(
        crossinline block: (CancellableContinuation<T>) -> Unit
    ): T =
        suspendCoroutineUninterceptedOrReturn { uCont ->
            val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
            cancellable.initCancellability()
            block(cancellable)
            cancellable.getResult()
    }
    

    cancellable是一个CancellableContinuationImpl对象,执行 block(cancellable),回到下面函数。

    public suspend fun delay(timeMillis: Long) {
        if (timeMillis <= 0) return // don't delay
        return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
            // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
            if (timeMillis < Long.MAX_VALUE) {
                cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
            }
        }
    }
    

    看一下cont.context.delayget方法

    internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
    

    如果get(ContinuationInterceptor)Delay类型对象,那么直接返回该对象,如果不是返回DefaultDelay变量,看一下DefaultDelay初始化可以知道,它是一个DefaultExecutor对象,继承了EventLoopImplBase类。

    runBlocking执行过程中有这样一行代码createCoroutineUnintercepted(receiver, completion).intercepted()会被ContinuationInterceptor进行包装。所以上面cont.context.delay返回的就是被包装的携程体上下文。

    查看scheduleResumeAfterDelay方法。

        public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
            val timeNanos = delayToNanos(timeMillis)
            if (timeNanos < MAX_DELAY_NS) {
                val now = nanoTime()
                DelayedResumeTask(now + timeNanos, continuation).also { task ->
                    continuation.disposeOnCancellation(task)
                    schedule(now, task)
                }
            }
        }
    

    创建DelayedResumeTask对象,在also执行相关计划任务,看一下schedule方法。

        public fun schedule(now: Long, delayedTask: DelayedTask) {
            when (scheduleImpl(now, delayedTask)) {
                SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
                SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
                SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
                else -> error("unexpected result")
            }
        }
    

    这里返回SCHEDULE_OK,执行unpark函数,这里用到了Java提供的LockSupport线程操作相关知识。

    读取线程

      val thread = thread
    
    • 如果delay是当前携程的上下文
      那么把延时任务加入到队列后,那么又是怎么达到线程延迟呢。回到runBlocking执行流程,会执行coroutine.joinBlocking()这样一行代码。

        fun joinBlocking(): T {
            registerTimeLoopThread()
            try {
                eventLoop?.incrementUseCount()
                try {
                    while (true) {
                        @Suppress("DEPRECATION")
                        if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                        val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                        // note: process next even may loose unpark flag, so check if completed before parking
                        if (isCompleted) break
                        parkNanos(this, parkNanos)
                    }
                } finally { // paranoia
                    eventLoop?.decrementUseCount()
                }
            } finally { // paranoia
                unregisterTimeLoopThread()
            }
            // now return result
            val state = this.state.unboxState()
            (state as? CompletedExceptionally)?.let { throw it.cause }
            return state as T
        }
      

      执行:

       val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
      

      看一下processNextEvent

        override fun processNextEvent(): Long {
            // unconfined events take priority
            if (processUnconfinedEvent()) return 0
            // queue all delayed tasks that are due to be executed
            val delayed = _delayed.value
            if (delayed != null && !delayed.isEmpty) {
                val now = nanoTime()
                while (true) {         
                    delayed.removeFirstIf {
                        if (it.timeToExecute(now)) {
                            enqueueImpl(it)
                        } else
                            false
                    } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
                }
            }
            // then process one event from queue
            val task = dequeue()
            if (task != null) {
                task.run()
                return 0
            }
            return nextTime
        }
      

      从延迟队列取任务

      val delayed = _delayed.value
      

      挂起当前线程

      parkNanos(this, parkNanos)
      

      这里是一个while循环,当挂起时间到,线程唤醒,继续从任务队列中取任务执行。如果还是延迟任务,这根据当前时间点,计算线程需要挂起的时间,这也是为什么多个延迟任务好像是同时执行的。

    • 如果delay是DefaultExecutor
      比如这个例子:携程上下文没有像CoroutineStart.DEFAULT那样进行包装。

      fun main() {
        GlobalScope.launch(start = CoroutineStart.UNDISPATCHED){
             println("${treadName()}======我开始执行了~")
             delay(1000)
              println("${treadName()}======全局携程~")
          }
          println("${treadName()}======我要睡觉~")
          Thread.sleep(3000)
      }
      

      然后调用DefaultExecutor类中thread的get方法:

        override val thread: Thread
            get() = _thread ?: createThreadSync()
      

      看一下createThreadSync函数

        private fun createThreadSync(): Thread {
            return _thread ?: Thread(this, THREAD_NAME).apply {
                _thread = this
                isDaemon = true
                start()
            }
        }
      

      创建一个叫"kotlinx.coroutines.DefaultExecutor的新线程,并且开始运行。这时候会执行DefaultExecutor中的run方法。在run方法中有这样一行代码:

      parkNanos(this, parkNanos)
      

      点进去看看:

      internal inline fun parkNanos(blocker: Any, nanos: Long) {
        timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker,   nanos)
      }
      

      调用Java提供的LockSupport.parkNanos(blocker, nanos)方法,阻塞当前线程,实现挂起,当达到阻塞的时间,恢复线程执行。

    查看进行中线程情况方法

    fun main() {
        println("${treadName()}======doSuspendTwo")
        Thread.sleep(500000)
    }
    

    运行main,通过命令jps找到对应Java进程(没有特别指定,进程名为文件名)号。

    ...
    3406 KotlinCoreutinesSuspendKt
    ...
    

    执行jstack 进程号查看进程对应的线程资源。

    相关文章

      网友评论

        本文标题:【kotlin】- delay函数实现原理

        本文链接:https://www.haomeiwen.com/subject/bllawltx.html