上两篇文章梳理了协程的运行原理,因为线程池相对于协程实现来说是可以单独拿出来讲的,所以分析到线程池的时候没有继续深入,现在就单独来看看协程线程池的实现。
协程线程池是由分发器Dispatchers来维护的,主要是Dispatchers.DEFAULT和Dispatchers.IO两个分发器。
Dispatchers.DEFAULT
Dispatchers.DEFAULT它持有的线程池是CoroutineScheduler:
【SchedulerCoroutineDispatcher】
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) // 线程池分发
...略
}
【CoroutineScheduler】
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int, // 核心线程数
@JvmField val maxPoolSize: Int, // 最大线程数
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, // 空闲线程存活时间
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME // 线程池名称"CoroutineScheduler"
) : Executor, Closeable {
// 用于存储全局的纯CPU(不阻塞)任务
@JvmField
val globalCpuQueue = GlobalQueue()
// 用于存储全局的执行非纯CPU(可能阻塞)任务
@JvmField
val globalBlockingQueue = GlobalQueue()
// 用于记录当前处于Parked状态(一段时间后自动终止)的线程的数量
private val parkedWorkersStack = atomic(0L)
// 用于保存当前线程池中的线程
// workers[0]永远为null,作为哨兵位
// index从1到maxPoolSize为有效线程
@JvmField
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
// 控制状态
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
// 表示已经创建的线程的数量
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
// 表示可以获取的CPU令牌数量,初始值为线程池核心线程数量
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
// 获取指定的状态的已经创建的线程的数量
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
// 获取指定的状态的执行阻塞任务的数量
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
// 获取指定的状态的CPU令牌数量
public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
// 当前已经创建的线程数量加1
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
// 当前已经创建的线程数量减1
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
// 当前执行阻塞任务的线程数量加1
private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
// 当前执行阻塞任务的线程数量减1
private inline fun decrementBlockingTasks() {
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}
// 尝试获取CPU令牌
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
val available = availableCpuPermits(state)
if (available == 0) return false
val update = state - (1L shl CPU_PERMITS_SHIFT)
if (controlState.compareAndSet(state, update)) return true
}
// 释放CPU令牌
private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
// 表示当前线程池是否关闭
private val _isTerminated = atomic(false)
val isTerminated: Boolean get() = _isTerminated.value
companion object {
// 用于标记一个线程是否在parkedWorkersStack中(处于Parked状态)
@JvmField
val NOT_IN_STACK = Symbol("NOT_IN_STACK")
// 线程的三个状态
// CLAIMED表示线程可以执行任务
// PARKED表示线程暂停执行任务,一段时间后会自动进入终止状态
// TERMINATED表示线程处于终止状态
private const val PARKED = -1
private const val CLAIMED = 0
private const val TERMINATED = 1
// 以下五个常量为掩码
private const val BLOCKING_SHIFT = 21 // 2x1024x1024
// 1-21位
private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
// 22-42位
private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
// 42
private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
// 43-63位
private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
// 以下两个常量用于require中参数判断
internal const val MIN_SUPPORTED_POOL_SIZE = 1
// 2x1024x1024-2
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
// parkedWorkersStack的掩码
private const val PARKED_INDEX_MASK = CREATED_MASK
// inv表示01反转
private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
// 传入ruanable和taskContext构建一个Task实例,taskContext决定线程是cpu密集型还是阻塞型,通过上面Dispatchers.DEFAULT的代码可以知
// 道,它分发任务的时候是创建的NonBlockingContext,也就是非阻塞型的
val task = createTask(block, taskContext)
// 判断当前线程是否运行在当前线程池
val currentWorker = currentWorker()
// 尝试加入本地队列,注意这个方法是Woker的扩展方法,这个本地队列是Woker的变量,
// Woker是对Thread的一层封装,专门用于协程线程池里用的
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
// notAdded不为null,代表加入本地任务队列失败,也代表此时的线程不是运行在线程池
if (notAdded != null) {
// 尝试加入全局任务队列,这里的全局指的是线程池里维护的两个变量
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
// 如果任务是非阻塞任务,则唤醒cpu线程
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// 否则就唤醒阻塞线程
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// currentWorker方法
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
// Worker的扩展函数submitToLocalQueue
private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
// Worker 为空,直接返回任务本身
if (this == null) return task
// 非阻塞的任务且此时Worker处于阻塞状态,则直接返回
if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
return task
}
//表示本地队列里存有任务了
mayHaveLocalTasks = true
//加入到本地队列里
//localQueue 为Worker的成员变量
return localQueue.add(task, fair = tailDispatch)
}
// addToGlobalQueue方法
private fun addToGlobalQueue(task: Task): Boolean {
return if (task.isBlocking) {
//加入到全局阻塞队列
globalBlockingQueue.addLast(task)
} else {
//加入到全局cpu队列
globalCpuQueue.addLast(task)
}
}
fun signalCpuWork() {
//尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程
if (tryUnpark()) return
//若唤醒不成功,则需要尝试创建线程
if (tryCreateWorker()) return
//再试一次,边界条件
tryUnpark()
}
// 尝试创建线程的方法
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
//获取当前已经创建的线程数
val created = createdWorkers(state)
//获取当前阻塞的任务数
val blocking = blockingTasks(state)
//已创建的线程数-阻塞的任务数=非阻塞的线程数
//coerceAtLeast(0) 表示结果至少是0
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//如果非阻塞数小于核心线程数
// 现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。
//只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。
if (cpuWorkers < corePoolSize) {
//创建线程
val newCpuWorkers = createNewWorker()
//如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程
//目的是为了方便"偷"任务...
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
//创建成功
if (newCpuWorkers > 0) return true
}
return false
}
// 创建线程的方法
//workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问
private fun createNewWorker(): Int {
synchronized(workers) {
if (isTerminated) return -1
val state = controlState.value
//获取已创建的线程数
val created = createdWorkers(state)
//阻塞的任务数
val blocking = blockingTasks(state)
//非阻塞的线程数
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//非阻塞的线程数不能超过核心线程数
if (cpuWorkers >= corePoolSize) return 0
//已创建的线程数不能大于最大线程数
if (created >= maxPoolSize) return 0
val newIndex = createdWorkers + 1
require(newIndex > 0 && workers[newIndex] == null)
//构造线程
val worker = Worker(newIndex)
//记录到数组里
workers[newIndex] = worker
//记录创建的线程数
require(newIndex == incrementCreatedWorkers())
//开启线程
worker.start()
//当前非阻塞线程数
return cpuWorkers + 1
}
}
...略
}
通过以上分析,知道有三个任务队列,这里要理清一下这些任务队列的关系:
线程池dispatch的操作可以大概做个总结:
1.先把分发下来的runable任务封装成Task,并标记它是阻塞型的还是非阻塞型的
2.判断当前线程是否是运行在当前线程池里的线程,是的话就把传进来的Task直接加入当前线程的任务队列中
3.否则根据task类型加入到线程池的相应任务队列中
4.尝试唤醒相应类型的线程,没有的话就创建线程来执行工作
线程池里的Worker
看看真正执行任务的地方,重点看Worker里的runWorker方法
// Worker内部类
internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}
// guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
@Volatile // volatile for push/pop operation into parkedWorkersStack
var indexInArray = 0
set(index) {
name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
field = index
}
constructor(index: Int) : this() {
indexInArray = index
}
inline val scheduler get() = this@CoroutineScheduler
@JvmField
val localQueue: WorkQueue = WorkQueue()
// 线程状态
var state = WorkerState.DORMANT
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
//一直查找任务去执行,除非worker终止了
while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
//从队列里寻找任务
//mayHaveLocalTasks:本地队列里是否有任务
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//任务获取到后,执行任务
executeTask(task)
//任务执行完毕,继续循环查找任务
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
// 这个rescanned控制下面的分支代码的执行
if (!rescanned) {
rescanned = true
} else {
//挂起一段时间再去偷
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L // 只执行一次,下次循环不会再命中
}
continue
}
//尝试挂起
tryPark()
}
//释放token
tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
}
fun findTask(scanLocalQueue: Boolean): Task? {
//尝试获取cpu 许可
//若是拿到cpu 许可,则可以执行任何任务
// 它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想
// 要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。
if (tryAcquireCpuPermit())
return findAnyTask(scanLocalQueue)
//拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
//都拿不到,则偷别人的
// 当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共
// 享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里
// 是否有可以执行的任务,若是有就可以偷过来用。
return task ?: trySteal(blockingOnly = true)
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
if (scanLocalQueue) {
//可以从本地队列找
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
//从全局队列找
pollGlobalQueues()?.let { return it }
}
//偷别人的
return trySteal(blockingOnly = false)
}
// 从全局队列获取任务
private fun pollGlobalQueues(): Task? {
// 随机获取CPU任务或者非CPU任务
if (nextInt(2) == 0) {
// 优先获取CPU任务
globalCpuQueue.removeFirstOrNull()?.let { return it }
return globalBlockingQueue.removeFirstOrNull()
} else {
// 优先获取非CPU任务
globalBlockingQueue.removeFirstOrNull()?.let { return it }
return globalCpuQueue.removeFirstOrNull()
}
}
————————————————
版权声明:本文为CSDN博主「LeeDuo.」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126492774
// 挂起函数,这是针对woker的状态
private fun tryPark() {
//没有在挂起栈里
if (!inStack()) {
//将worker放入挂起栈里
parkedWorkersStackPush(this)
return
}
while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
//真正挂起(不是实时,会暂时挂起一段时间idleWorkerKeepAliveNs,线程空闲时间),并标记worker state 状态,会修改state = WorkerState.TERMINATED,runWorker循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。
park()
}
...略
}
做个小总结:
1.线程执行的时候从全局队列、本地队列里查找任务。
2.若是没找到,则尝试从别的Worker 本地队列里偷取任务。
3.能够找到任务则最终会执行协程体里的代码。
4.若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。
Dispatchers.IO:
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override fun dispatch(context: CoroutineContext, block: Runnable) {
default.dispatch(context, block)
}
...略
}
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}
// 最终调用了DefaultScheduler的分分发方法
override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
}
// UnlimitedIoScheduler父类CoroutineDispatcher的方法
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
// 返回一个UnlimitedIoScheduler的代理Dispatcher
return LimitedDispatcher(this, parallelism)
}
通过以上代码可以看到Dispatchers.IO最终调用到的线程池分发方法是DEFAULT里的,而DEFAULT是个单例,所以两者其实共享了线程池CoroutineScheduler.
但是随着对代理类LimitedDispatcher的深入研究发现Dispatchers.IO策略上有所不同。
// 因为Dispatchers.IO是单例的,所以内部的这个LimitedDispatcher也是单例的,先看名字,这是一个受限制的分发器,
// 限制啥?限制的是最大并行数量,由系统属性设定的值或 CPU 核心数的最大值决定,系统属性值一般设置的是 64,也就是说,一般来说,该调度器可能会创建 64 个线程来执行任务
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
@Volatile
private var runningWorkers = 0
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()
@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
}
// 核心方法,分发任务
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 先执行dispatchInternal
dispatchInternal(block) {
// 再把自己作为runnable参数给到代理的dispatcher
dispatcher.dispatch(this, this)
}
}
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
// 添加任务到队列,如果此时最大并行任务超过了限制就直接return,暂时不执行dispatch方法
if (addAndTryDispatching(block)) return
// 统计当前的并行任务,超过了就直接return,暂时不执行dispatch方法
if (!tryAllocateWorker()) return
// 做完了添加操作执行dispatch(),此时才会真正分发到协程线程池CoroutineSchduler
dispatch()
}
// 添加任务到队列并尝试分发
private fun addAndTryDispatching(block: Runnable): Boolean {
// 添加本地任务队列,添加不需要条件 来了就添加
queue.addLast(block)
// 判断正在执行的任务的数量是否大于最大并行线程数量,正是在这里做到了限制最大并行IO线程的作用
return runningWorkers >= parallelism
}
// 尝试分配线程,其实不会新建线程,只是统计一下当前的并行线程数量,在这里仍然会先判断是否大于了最大并行限制
private fun tryAllocateWorker(): Boolean {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
}
}
// 注意他自己实现了Runnable,最终他是把自己送到CoroutineSchduler去执行的,协程传进来的任务在这里被包装了执行
override fun run() {
var fairnessCounter = 0
// 别被这个循环体迷惑,就以为线程都是串行执行的,实际每次新协程创建任务都会执行run方法,然后最终包装到协程线程池去并发执行
// 这个循环是为了执行超过最大并发数的时候,那些只添加到了队列但是没有立马执行的任务
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
try {
task.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
// 这里比较有意思,当有大量的并发任务(比如短时间添加了200个任务),那么这个方法有可能会长时间的执行下去,所以
// 这里为了公平起见不长期霸占资源,当超过执行了16个任务后就重新分发一次,这样就能短暂的让出cpu让别的线程执行(不知道理解的对不对)
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(this, this)
return
}
continue
}
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
fairnessCounter = 0
}
}
}
}
由上可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。
只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:
当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。
到此协程线程池基本原理分析完毕,下篇打算探讨一些协程的常用案例,看看哪些适合应用在实际开发中以及注意事项。
网友评论