美文网首页
庖丁解牛,一文搞懂Kotlin协程的线程池

庖丁解牛,一文搞懂Kotlin协程的线程池

作者: Android程序员老鸦 | 来源:发表于2023-09-02 03:51 被阅读0次

    上两篇文章梳理了协程的运行原理,因为线程池相对于协程实现来说是可以单独拿出来讲的,所以分析到线程池的时候没有继续深入,现在就单独来看看协程线程池的实现。
    协程线程池是由分发器Dispatchers来维护的,主要是Dispatchers.DEFAULT和Dispatchers.IO两个分发器。

    Dispatchers.DEFAULT

    Dispatchers.DEFAULT它持有的线程池是CoroutineScheduler:

      【SchedulerCoroutineDispatcher】
        internal open class SchedulerCoroutineDispatcher(
        private val corePoolSize: Int = CORE_POOL_SIZE,
        private val maxPoolSize: Int = MAX_POOL_SIZE,
        private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
        private val schedulerName: String = "CoroutineScheduler",
    ) : ExecutorCoroutineDispatcher() {
       private var coroutineScheduler = createScheduler()
    
        private fun createScheduler() =
            CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
        
        override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) // 线程池分发
    
        ...略
    }
    
      【CoroutineScheduler】
       internal class CoroutineScheduler(
        @JvmField val corePoolSize: Int, // 核心线程数
        @JvmField val maxPoolSize: Int, // 最大线程数
        @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, // 空闲线程存活时间
        @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME // 线程池名称"CoroutineScheduler"
    ) : Executor, Closeable {
        // 用于存储全局的纯CPU(不阻塞)任务
        @JvmField
        val globalCpuQueue = GlobalQueue()
    
        // 用于存储全局的执行非纯CPU(可能阻塞)任务
        @JvmField
        val globalBlockingQueue = GlobalQueue()
    
        // 用于记录当前处于Parked状态(一段时间后自动终止)的线程的数量
        private val parkedWorkersStack = atomic(0L)
    
        // 用于保存当前线程池中的线程
        // workers[0]永远为null,作为哨兵位
        // index从1到maxPoolSize为有效线程
        @JvmField
        val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
    
        // 控制状态
        private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
        // 表示已经创建的线程的数量
        private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
        // 表示可以获取的CPU令牌数量,初始值为线程池核心线程数量
        private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
    
        // 获取指定的状态的已经创建的线程的数量
        private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
        // 获取指定的状态的执行阻塞任务的数量
        private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
        // 获取指定的状态的CPU令牌数量
        public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
    
        // 当前已经创建的线程数量加1
        private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
        // 当前已经创建的线程数量减1
        private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
        // 当前执行阻塞任务的线程数量加1
        private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
        // 当前执行阻塞任务的线程数量减1
        private inline fun decrementBlockingTasks() {
            controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
        }
    
        // 尝试获取CPU令牌
        private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
            val available = availableCpuPermits(state)
            if (available == 0) return false
            val update = state - (1L shl CPU_PERMITS_SHIFT)
            if (controlState.compareAndSet(state, update)) return true
        }
        // 释放CPU令牌
        private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
    
        // 表示当前线程池是否关闭
        private val _isTerminated = atomic(false)
        val isTerminated: Boolean get() = _isTerminated.value
    
    companion object {
        // 用于标记一个线程是否在parkedWorkersStack中(处于Parked状态)
        @JvmField
        val NOT_IN_STACK = Symbol("NOT_IN_STACK")
    
        // 线程的三个状态
        // CLAIMED表示线程可以执行任务
        // PARKED表示线程暂停执行任务,一段时间后会自动进入终止状态
        // TERMINATED表示线程处于终止状态
        private const val PARKED = -1
        private const val CLAIMED = 0
        private const val TERMINATED = 1
    
        // 以下五个常量为掩码
        private const val BLOCKING_SHIFT = 21 // 2x1024x1024
        // 1-21位
        private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
        // 22-42位
        private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
        // 42
        private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
        // 43-63位
        private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
    
        // 以下两个常量用于require中参数判断
        internal const val MIN_SUPPORTED_POOL_SIZE = 1
        // 2x1024x1024-2
        internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
    
        // parkedWorkersStack的掩码
        private const val PARKED_INDEX_MASK = CREATED_MASK
        // inv表示01反转
        private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
        private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
    }
    
           
        
    
        fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
            trackTask() // this is needed for virtual time support
            // 传入ruanable和taskContext构建一个Task实例,taskContext决定线程是cpu密集型还是阻塞型,通过上面Dispatchers.DEFAULT的代码可以知
            // 道,它分发任务的时候是创建的NonBlockingContext,也就是非阻塞型的
            val task = createTask(block, taskContext) 
            // 判断当前线程是否运行在当前线程池
            val currentWorker = currentWorker()
            // 尝试加入本地队列,注意这个方法是Woker的扩展方法,这个本地队列是Woker的变量,
            // Woker是对Thread的一层封装,专门用于协程线程池里用的
            val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
            // notAdded不为null,代表加入本地任务队列失败,也代表此时的线程不是运行在线程池
            if (notAdded != null) {
                // 尝试加入全局任务队列,这里的全局指的是线程池里维护的两个变量
                if (!addToGlobalQueue(notAdded)) {
                    // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                    throw RejectedExecutionException("$schedulerName was terminated")
                }
            }
            val skipUnpark = tailDispatch && currentWorker != null
            // Checking 'task' instead of 'notAdded' is completely okay
            // 如果任务是非阻塞任务,则唤醒cpu线程
            if (task.mode == TASK_NON_BLOCKING) {
                if (skipUnpark) return
                signalCpuWork()
            } else {
                // 否则就唤醒阻塞线程
                signalBlockingWork(skipUnpark = skipUnpark)
            }
        }
        // currentWorker方法
        private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
    
        // Worker的扩展函数submitToLocalQueue
        private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
          // Worker 为空,直接返回任务本身
          if (this == null) return task
          // 非阻塞的任务且此时Worker处于阻塞状态,则直接返回
          if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
            return task
          }
          //表示本地队列里存有任务了
          mayHaveLocalTasks = true
          //加入到本地队列里
          //localQueue 为Worker的成员变量
          return localQueue.add(task, fair = tailDispatch)
        }
        // addToGlobalQueue方法
        private fun addToGlobalQueue(task: Task): Boolean {
          return if (task.isBlocking) {
            //加入到全局阻塞队列
            globalBlockingQueue.addLast(task)
        } else {
            //加入到全局cpu队列
            globalCpuQueue.addLast(task)
          }
        }
        fun signalCpuWork() {
         //尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程
          if (tryUnpark()) return
          //若唤醒不成功,则需要尝试创建线程
          if (tryCreateWorker()) return
          //再试一次,边界条件
          tryUnpark()
        }
        // 尝试创建线程的方法
        private fun tryCreateWorker(state: Long = controlState.value): Boolean {
         //获取当前已经创建的线程数
          val created = createdWorkers(state)
          //获取当前阻塞的任务数
          val blocking = blockingTasks(state)
          //已创建的线程数-阻塞的任务数=非阻塞的线程数
          //coerceAtLeast(0) 表示结果至少是0
          val cpuWorkers = (created - blocking).coerceAtLeast(0)
          //如果非阻塞数小于核心线程数
        // 现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。
         //只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。
          if (cpuWorkers < corePoolSize) {
              //创建线程
              val newCpuWorkers = createNewWorker()
              //如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程
              //目的是为了方便"偷"任务...
              if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
              //创建成功
              if (newCpuWorkers > 0) return true
          }
          return false
        }
        // 创建线程的方法
        //workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问
        private fun createNewWorker(): Int {
          synchronized(workers) {
            if (isTerminated) return -1
            val state = controlState.value
            //获取已创建的线程数
            val created = createdWorkers(state)
            //阻塞的任务数
            val blocking = blockingTasks(state)
            //非阻塞的线程数
            val cpuWorkers = (created - blocking).coerceAtLeast(0)
            //非阻塞的线程数不能超过核心线程数
            if (cpuWorkers >= corePoolSize) return 0
            //已创建的线程数不能大于最大线程数
            if (created >= maxPoolSize) return 0
            val newIndex = createdWorkers + 1
            require(newIndex > 0 && workers[newIndex] == null)
            //构造线程
            val worker = Worker(newIndex)
            //记录到数组里
            workers[newIndex] = worker
            //记录创建的线程数
            require(newIndex == incrementCreatedWorkers())
            //开启线程
            worker.start()
            //当前非阻塞线程数
            return cpuWorkers + 1
        }
    }
    
        ...略
    }
    

    通过以上分析,知道有三个任务队列,这里要理清一下这些任务队列的关系:


    线程池dispatch的操作可以大概做个总结:
    1.先把分发下来的runable任务封装成Task,并标记它是阻塞型的还是非阻塞型的
    2.判断当前线程是否是运行在当前线程池里的线程,是的话就把传进来的Task直接加入当前线程的任务队列中
    3.否则根据task类型加入到线程池的相应任务队列中
    4.尝试唤醒相应类型的线程,没有的话就创建线程来执行工作
    线程池里的Worker

    看看真正执行任务的地方,重点看Worker里的runWorker方法

    // Worker内部类
        internal inner class Worker private constructor() : Thread() {
            init {
                isDaemon = true
            }
    
            // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
            @Volatile // volatile for push/pop operation into parkedWorkersStack
            var indexInArray = 0
                set(index) {
                    name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
                    field = index
                }
    
            constructor(index: Int) : this() {
                indexInArray = index
            }
    
            inline val scheduler get() = this@CoroutineScheduler
    
            @JvmField
            val localQueue: WorkQueue = WorkQueue()
            // 线程状态
            var state = WorkerState.DORMANT
            
    
            override fun run() = runWorker()
    
            private fun runWorker() {
                var rescanned = false
              //一直查找任务去执行,除非worker终止了
              while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
                //从队列里寻找任务
                //mayHaveLocalTasks:本地队列里是否有任务
                val task = findTask(mayHaveLocalTasks)
                if (task != null) {
                    rescanned = false
                    minDelayUntilStealableTaskNs = 0L
                    //任务获取到后,执行任务
                    executeTask(task)
                    //任务执行完毕,继续循环查找任务
                    continue
                } else {
                    mayHaveLocalTasks = false
                }
                if (minDelayUntilStealableTaskNs != 0L) {
                    // 这个rescanned控制下面的分支代码的执行
                    if (!rescanned) {
                        rescanned = true
                    } else {
                        //挂起一段时间再去偷
                            rescanned = false
                            tryReleaseCpu(WorkerState.PARKING)
                            interrupted()
                            LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                            minDelayUntilStealableTaskNs = 0L   // 只执行一次,下次循环不会再命中              
                    }  
                    continue
                }
              //尝试挂起
              tryPark()
            }
            //释放token
            tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
          }
    
          fun findTask(scanLocalQueue: Boolean): Task? {
              //尝试获取cpu 许可
              //若是拿到cpu 许可,则可以执行任何任务
               // 它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想
              // 要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。
              if (tryAcquireCpuPermit())
                   return findAnyTask(scanLocalQueue)
               //拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取
               val task = if (scanLocalQueue) {
                   localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
               } else {
                   globalBlockingQueue.removeFirstOrNull()
               }
               //都拿不到,则偷别人的
                // 当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共
              // 享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里
              // 是否有可以执行的任务,若是有就可以偷过来用。
               return task ?: trySteal(blockingOnly = true)
           }
    
           private fun findAnyTask(scanLocalQueue: Boolean): Task? {
               if (scanLocalQueue) {
                   //可以从本地队列找
                   val globalFirst = nextInt(2 * corePoolSize) == 0
                   if (globalFirst) pollGlobalQueues()?.let { return it }
                   localQueue.poll()?.let { return it }
                   if (!globalFirst) pollGlobalQueues()?.let { return it }
               } else {
                   //从全局队列找
                   pollGlobalQueues()?.let { return it }
               }
               //偷别人的
               return trySteal(blockingOnly = false)
           }
    // 从全局队列获取任务
    private fun pollGlobalQueues(): Task? {
        // 随机获取CPU任务或者非CPU任务
        if (nextInt(2) == 0) {
            // 优先获取CPU任务
            globalCpuQueue.removeFirstOrNull()?.let { return it }
            return globalBlockingQueue.removeFirstOrNull()
        } else {
            // 优先获取非CPU任务
            globalBlockingQueue.removeFirstOrNull()?.let { return it }
            return globalCpuQueue.removeFirstOrNull()
        }
    }
    ————————————————
    版权声明:本文为CSDN博主「LeeDuo.」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126492774
    
    
          // 挂起函数,这是针对woker的状态
          private fun tryPark() {
              //没有在挂起栈里
            if (!inStack()) {
                //将worker放入挂起栈里
                parkedWorkersStackPush(this)
                return
            }
            while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
                if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
                //真正挂起(不是实时,会暂时挂起一段时间idleWorkerKeepAliveNs,线程空闲时间),并标记worker state 状态,会修改state = WorkerState.TERMINATED,runWorker循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。
                park()
            }
    
            ...略
        }
    

    做个小总结:
    1.线程执行的时候从全局队列、本地队列里查找任务。
    2.若是没找到,则尝试从别的Worker 本地队列里偷取任务。
    3.能够找到任务则最终会执行协程体里的代码。
    4.若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。

    Dispatchers.IO:
    internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
    
        private val default = UnlimitedIoScheduler.limitedParallelism(
            systemProp(
                IO_PARALLELISM_PROPERTY_NAME,
                64.coerceAtLeast(AVAILABLE_PROCESSORS)
            )
        )
        override fun dispatch(context: CoroutineContext, block: Runnable) {
                default.dispatch(context, block)
        }
        ...略
    }
    
    // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
    private object UnlimitedIoScheduler : CoroutineDispatcher() {
    
        @InternalCoroutinesApi
        override fun dispatchYield(context: CoroutineContext, block: Runnable) {
            DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
        }
         // 最终调用了DefaultScheduler的分分发方法
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
        }
    }
    // UnlimitedIoScheduler父类CoroutineDispatcher的方法
        public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
            parallelism.checkParallelism()
            // 返回一个UnlimitedIoScheduler的代理Dispatcher
            return LimitedDispatcher(this, parallelism)
        }
    

    通过以上代码可以看到Dispatchers.IO最终调用到的线程池分发方法是DEFAULT里的,而DEFAULT是个单例,所以两者其实共享了线程池CoroutineScheduler.
    但是随着对代理类LimitedDispatcher的深入研究发现Dispatchers.IO策略上有所不同。

    // 因为Dispatchers.IO是单例的,所以内部的这个LimitedDispatcher也是单例的,先看名字,这是一个受限制的分发器,
    // 限制啥?限制的是最大并行数量,由系统属性设定的值或 CPU 核心数的最大值决定,系统属性值一般设置的是 64,也就是说,一般来说,该调度器可能会创建 64 个线程来执行任务
    internal class LimitedDispatcher(
        private val dispatcher: CoroutineDispatcher,
        private val parallelism: Int
    ) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
    
        @Volatile
        private var runningWorkers = 0
    
        private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
    
        // A separate object that we can synchronize on for K/N
        private val workerAllocationLock = SynchronizedObject()
    
        @ExperimentalCoroutinesApi
        override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
            parallelism.checkParallelism()
            if (parallelism >= this.parallelism) return this
            return super.limitedParallelism(parallelism)
        }
    
        // 核心方法,分发任务
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            // 先执行dispatchInternal
            dispatchInternal(block) {
                // 再把自己作为runnable参数给到代理的dispatcher
                dispatcher.dispatch(this, this)
            }
        }
      
        private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
            // 添加任务到队列,如果此时最大并行任务超过了限制就直接return,暂时不执行dispatch方法
            if (addAndTryDispatching(block)) return
            // 统计当前的并行任务,超过了就直接return,暂时不执行dispatch方法
            if (!tryAllocateWorker()) return
            // 做完了添加操作执行dispatch(),此时才会真正分发到协程线程池CoroutineSchduler
            dispatch()
        }
        // 添加任务到队列并尝试分发
         private fun addAndTryDispatching(block: Runnable): Boolean {
             // 添加本地任务队列,添加不需要条件 来了就添加
            queue.addLast(block)
            // 判断正在执行的任务的数量是否大于最大并行线程数量,正是在这里做到了限制最大并行IO线程的作用
            return runningWorkers >= parallelism 
        }
        // 尝试分配线程,其实不会新建线程,只是统计一下当前的并行线程数量,在这里仍然会先判断是否大于了最大并行限制
        private fun tryAllocateWorker(): Boolean {
            synchronized(workerAllocationLock) {
                if (runningWorkers >= parallelism) return false
                ++runningWorkers
                return true
            }
        }
    
           // 注意他自己实现了Runnable,最终他是把自己送到CoroutineSchduler去执行的,协程传进来的任务在这里被包装了执行
        override fun run() {
            var fairnessCounter = 0
            // 别被这个循环体迷惑,就以为线程都是串行执行的,实际每次新协程创建任务都会执行run方法,然后最终包装到协程线程池去并发执行
            // 这个循环是为了执行超过最大并发数的时候,那些只添加到了队列但是没有立马执行的任务
            while (true) {
                val task = queue.removeFirstOrNull()
                if (task != null) {
                    try {
                        task.run()
                    } catch (e: Throwable) {
                        handleCoroutineException(EmptyCoroutineContext, e)
                    }
                    //  这里比较有意思,当有大量的并发任务(比如短时间添加了200个任务),那么这个方法有可能会长时间的执行下去,所以
                  // 这里为了公平起见不长期霸占资源,当超过执行了16个任务后就重新分发一次,这样就能短暂的让出cpu让别的线程执行(不知道理解的对不对)
                    if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
                        // Do "yield" to let other views to execute their runnable as well
                        // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
                        dispatcher.dispatch(this, this)
                        return
                    }
                    continue
                }
    
                synchronized(workerAllocationLock) {
                    --runningWorkers
                    if (queue.size == 0) return
                    ++runningWorkers
                    fairnessCounter = 0
                }
            }
        }
    }
    

    由上可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。
    只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:
    当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。

    到此协程线程池基本原理分析完毕,下篇打算探讨一些协程的常用案例,看看哪些适合应用在实际开发中以及注意事项。

    相关文章

      网友评论

          本文标题:庖丁解牛,一文搞懂Kotlin协程的线程池

          本文链接:https://www.haomeiwen.com/subject/fdkzmdtx.html