【kotlin】- 携程的执行流程

作者: 拔萝卜占坑 | 来源:发表于2021-09-12 16:02 被阅读0次

    简介

    这篇文章将从源码的角度,分析携程的执行流程,我们创建一个携程,系统是怎么进行调度的,什么时候执行的,是否需要创建新线程等等,带着这些疑问,一起往下看吧。

    例子先行

    fun main(): Unit = runBlocking {
        launch {
            println("${treadName()}======1")
        }
        GlobalScope.launch {
            println("${treadName()}======3")
        }
        launch {
            println("${treadName()}======2")
        }
        println("${treadName()}======4")
        Thread.sleep(2000)
    }
    

    输出如下:

    DefaultDispatcher-worker-1======3
    main======4
    main======1
    main======2
    
    Process finished with exit code 0
    

    根据打印,如果根据单线程执行流程来看,是不是感觉上面的日志打印顺序有点不好理解,下面我们就逐步来进行分解。

    • runBlocking携程体
      这里将其它代码省略到了,我这里都是按照一条简单的执行流程进行讲解。

      public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
      
          val eventLoop: EventLoop?
          val newContext: CoroutineContext
          ...
          if (contextInterceptor == null) {
              eventLoop = ThreadLocalEventLoop.eventLoop
              newContext = GlobalScope.newCoroutineContext(context + eventLoop)
          }
          ...
          val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
          coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
          return coroutine.joinBlocking()
      }
      

      看一下eventLoop的初始化,会 在当前线程(主线程)创建BlockingEventLoop对象。

      internal val eventLoop: EventLoop
            get() = ref.get() ?: createEventLoop().also { ref.set(it) }
      
      internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
      

      看一下newContext初始化,这里会对携程上下文进行组合,返回新的上下文。最后返回的是一个BlockingEventLoop对象。

      public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
         val combined = coroutineContext + context
          val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
          return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
            debug + Dispatchers.Default else debug
      }
      

      开始对携程进行调度

       coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
      

      看一下执行这句代码之前,各变量的值

      111.png
      而上面的代码最终调用的是CoroutineStart.DEFAULTinvoke方法。
        public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
            when (this) {
                DEFAULT -> block.startCoroutineCancellable(completion)
                ATOMIC -> block.startCoroutine(completion)
                UNDISPATCHED -> block.startCoroutineUndispatched(completion)
                LAZY -> Unit // will start lazily
            }
      

      我们使用的是DEFAULT启动模式。然后会执行resumeCancellableWith方法。

        inline fun resumeCancellableWith(
            result: Result<T>,
            noinline onCancellation: ((cause: Throwable) -> Unit)?
        ) {
            val state = result.toState(onCancellation)
            if (dispatcher.isDispatchNeeded(context)) {
                _state = state
                resumeMode = MODE_CANCELLABLE
                dispatcher.dispatch(context, this)
            } else {
                executeUnconfined(state, MODE_CANCELLABLE) {
                    if (!resumeCancelled(state)) {
                        resumeUndispatchedWith(result)
                    }
                }
            }
        }
      

      dispatcherBlockingEventLoop对象,没有重写isDispatchNeeded,默认返回true。然后调用dispatch继续进行分发。BlockingEventLoop继承了EventLoopImplBase并调用其dispatch方法。把任务加入到队列中。

      public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
      

      回到最开始,在coroutine.start(CoroutineStart.DEFAULT, coroutine, block)执行完,还执行了coroutine.joinBlocking()看一下实现。

          fun joinBlocking(): T {
            registerTimeLoopThread()
            try {
                eventLoop?.incrementUseCount()
                try {
                    while (true) {
                        @Suppress("DEPRECATION")
                        if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                        val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                        // note: process next even may loose unpark flag, so check if completed before parking
                        if (isCompleted) break
                        parkNanos(this, parkNanos)
                    }
                } finally { // paranoia
                    eventLoop?.decrementUseCount()
                }
            } finally { // paranoia
                unregisterTimeLoopThread()
            }
            // now return result
            val state = this.state.unboxState()
            (state as? CompletedExceptionally)?.let { throw it.cause }
            return state as T
        }
      

      执行val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE,取出任务进行执行,也就是runBlocking携程体。

    • launch {} 执行流程

      public fun CoroutineScope.launch(
          context: CoroutineContext = EmptyCoroutineContext,
          start: CoroutineStart = CoroutineStart.DEFAULT,
          block: suspend CoroutineScope.() -> Unit
      ): Job {
          val newContext = newCoroutineContext(context)
          val coroutine = if (start.isLazy)
              LazyStandaloneCoroutine(newContext, block) else
              StandaloneCoroutine(newContext, active = true)
          coroutine.start(start, coroutine, block)
          return coroutine
      }
      

      因为launch是直接在runBlocking(父携程体)里新的创建的子携程体,所以执行流程上和之前将的差不多,只不过不会像runBlocking再去创建BlockingEventLoop对象,而是直接用runBlocking(父携程体)的,然后把任务加到里面,所以通过这种方式其实就是单线程对任务的调度而已。所以在runBlocking(父携程体)内通过launch启动再多的携程体,其实都是在同一线程,按照任务队列的顺序执行的。

    根据上面日志输出,并没有先执行两个launch携程体,这是为什么呢,根据上面的讲解,应用知道,runBlocking(父携程体)是第一被添加的队列的任务,其次是launch,所以是这样的顺序。那可以让launch立即执行吗?答案是可以的,这就要说携程的启动模式了。

    • CoroutineStart 是协程的启动模式,存在以下4种模式:

      1. DEFAULT 立即调度,可以在执行前被取消
      2. LAZY 需要时才启动,需要start、join等函数触发才可进行调度
      3. ATOMIC 立即调度,协程肯定会执行,执行前不可以被取消
      4. UNDISPATCHED 立即在当前线程执行,直到遇到第一个挂起点(可能切线程)

      我们使用UNDISPATCHED就可以使携程体马上在当前线程执行。看一下是怎么实现的。看一下实现:

    使用这种启动模式执行UNDISPATCHED -> block.startCoroutineUndispatched(completion)方法。

    internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
        startDirect(completion) { actualCompletion ->
            withCoroutineContext(completion.context, null) {
                startCoroutineUninterceptedOrReturn(actualCompletion)
            }
        }
    }
    

    大家可以自己点击去看一下,大概就是会立即执行携程体,而不是将任务放入队列。

    但是GlobalScope.launch却不是按照这样的逻辑,这是因为GlobalScope.launch启动的全局携程,是一个独立的携程体了,并不是runBlocking(父携程体)子携程。看一下通过GlobalScope.launch有什么不同。

    • GlobalScope.launch执行流程
      1. 启动全局携程
      GlobalScope.launch
      
      newCoroutineContext(context)返回Dispatchers.Default对象。而DefaultScheduler继承了ExperimentalCoroutineDispatcher类。看一下ExperimentalCoroutineDispatcher中的dispatch代码:
       override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
           ...
               coroutineScheduler.dispatch(block)
           ...
      
      看一下coroutineScheduler初始化
      private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
      
      CoroutineScheduler实现了Executor接口,里面还有两个全局队列和线程池相关的参数。
      @JvmField
      val globalCpuQueue = GlobalQueue()
      @JvmField
      val globalBlockingQueue = GlobalQueue()
      
      继续调用CoroutineScheduler中的dispatch方法
        fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
            trackTask() // this is needed for virtual time support
            val task = createTask(block, taskContext)
            // try to submit the task to the local queue and act depending on the result
            val currentWorker = currentWorker()
            val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
            if (notAdded != null) {
                if (!addToGlobalQueue(notAdded)) {
                    // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                    throw RejectedExecutionException("$schedulerName was terminated")
                }
            }
            val skipUnpark = tailDispatch && currentWorker != null
            // Checking 'task' instead of 'notAdded' is completely okay
            if (task.mode == TASK_NON_BLOCKING) {
                if (skipUnpark) return
                signalCpuWork()
            } else {
                // Increment blocking tasks anyway
                signalBlockingWork(skipUnpark = skipUnpark)
            }
        }
      
      1. val task = createTask(block, taskContext)包装成TaskImpl对象。
      2. val currentWorker = currentWorker()当前是主线程,运行程序时由进程创建,肯定不是Worker对象,Worker是一个继承了Thread的类 ,并且在初始化时都指定为守护线程
        Worker存在5种状态:
        CPU_ACQUIRED 获取到cpu权限
        BLOCKING 正在执行IO阻塞任务
        PARKING 已处理完所有任务,线程挂起
        DORMANT 初始态
        TERMINATED 终止态
        
    1. val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)由于currentWorker是null,直接返回task对象。
    2. addToGlobalQueue(notAdded)根据任务是否是阻塞任务,将task添加到全局任务队列中。这里被添加到globalCpuQueue中。
    3. 执行signalCpuWork()来唤醒一个线程或者启动一个新的线程。
        fun signalCpuWork() {
          if (tryUnpark()) return
          if (tryCreateWorker()) return
          tryUnpark()
      }
    
     private fun tryCreateWorker(state: Long = controlState.value): Boolean {  
         val created = createdWorkers(state)// 创建的的线程总数  
         val blocking = blockingTasks(state)// 处理阻塞任务的线程数量  
         val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任务的线程数量  
         if (cpuWorkers < corePoolSize) {// 小于核心线程数量,进行线程的创建  
             val newCpuWorkers = createNewWorker()  
             if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 当前非阻塞型线程数量为1,同时核心线程数量大于1时,再进行一个线程的创建,  
             if (newCpuWorkers > 0) return true  
         }  
         return false  
     }  
       
      // 创建线程  
      private fun createNewWorker(): Int {  
         synchronized(workers) {  
             ...  
             val created = createdWorkers(state)// 创建的的线程总数  
             val blocking = blockingTasks(state)// 阻塞的线程数量  
             val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞线程数量  
             if (cpuWorkers >= corePoolSize) return 0//超过最大核心线程数,不能进行新线程创建  
            if (created >= maxPoolSize) return 0// 超过最大线程数限制,不能进行新线程创建  
             ...  
             val worker = Worker(newIndex)  
             workers[newIndex] = worker  
             require(newIndex == incrementCreatedWorkers())  
             worker.start()// 线程启动  
             return cpuWorkers + 1  
         }  
     }
    

    那么这里面的任务又是怎么调度的呢,当全局任务被执行的时候,看一下Worker中的run方法:

     override fun run() = runWorker()
    

    执行runWorker方法,该方法会从队列中找到执行任务,然后开始执行。详细代码,可以自行翻阅。

    所以GlobalScope.launch使用的就是线程池,没有所谓的性能好。

    • Dispatchers调度器
      Dispatchers是协程中提供的线程调度器,用来切换线程,指定协程所运行的线程。,上面用的是默认调度器Dispatchers.Default

    Dispatchers中提供了4种类型调度器:
    Default 默认调度器:适合CPU密集型任务调度器 比如逻辑计算;
    Main UI调度器
    Unconfined 无限制调度器:对协程执行的线程不做限制,协程恢复时可以在任意线程;
    IO调度器:适合IO密集型任务调度器 比如读写文件,网络请求等。

    相关文章

      网友评论

        本文标题:【kotlin】- 携程的执行流程

        本文链接:https://www.haomeiwen.com/subject/cdyawltx.html