美文网首页Android开发Android开发经验谈Android技术知识
Kotlin 协程之线程池探索之旅(与Java线程池PK)

Kotlin 协程之线程池探索之旅(与Java线程池PK)

作者: 小鱼人爱编程 | 来源:发表于2022-06-30 17:34 被阅读0次

    前言

    上篇文章分析了协程切换到主线程执行的详细流程,本篇将分析如何切换到子线程执行。
    通过本篇文章,你将了解到:

    1. 切换到子线程场景
    2. Dispatchers.Default 分发流程详解
    3. Dispatchers.IO 分发流程详解
    4. 与Java 线程池比对
    5. 协程到底在哪个线程执行?

    1. 切换到子线程场景

    Demo 展示

    先看一个最常见的网络请求Demo:

        fun showStuName() {
            GlobalScope.launch(Dispatchers.Main) {
                var stuInfo = withContext(Dispatchers.IO) {
                    //模拟网络请求
                    Thread.sleep(3000)
                    "我是小鱼人"
                }
                //展示
                Toast.makeText(context, stuInfo, Toast.LENGTH_SHORT).show()
            }
        }
    

    因为是耗时操作,因此需要切换到子线程处理,又因为是网络请求,属于I/O操作,因此使用Dispatchers.IO 分发器。

    若任务偏向于计算型,比较耗费CPU,可以改写如下:

        fun dealCpuTask() {
            GlobalScope.launch(Dispatchers.Main) {
                //切换到子线程    
                withContext(Dispatchers.Default) {
                    var i = 0
                    val count = 100000
                    while(i < count) {
                        Thread.sleep(1)
                    }
                }
            }
        }
    

    Dispatchers.IO/Dispatchers.Default 异同

    两者都是协程分发器,Dispatchers.IO 侧重于任务本身是阻塞型的,比如文件、数据库、网络等操作,此时是不怎么占用CPU的。而Dispatchers.Default 侧重于计算型的任务,可能会长时间占用CPU。
    协程线程池在设计的时候,针对两者在线程的调度策略上有所不同。


    image.png

    2. Dispatchers.Default 分发流程详解

    任务分发

    以上面的Demo 为例,从源码角度分析分发流程。
    从前面的文章很容易了解到:withContext()函数里构造了DispatchedContinuation,它本身也是个Runnable,通过:

    //this 指DispatchedContinuation 本身
    dispatcher.dispatch(context, this)
    

    进行分发。
    而dispatcher 就是分发器,我们这里用的是Dispatchers.Default,因此先来看看它的实现。

    #Dispatchers.kt
    actual object Dispatchers {
        @JvmStatic
        actual val Default: CoroutineDispatcher = createDefaultDispatcher()
        @JvmStatic
        public val IO: CoroutineDispatcher = DefaultScheduler.IO
        @JvmStatic
        public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    }
    

    可以看出Dispatchers 是个单例。

    #CoroutineContext.kt
    //useCoroutinesScheduler 默认为true
    //使用DefaultScheduler
    internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
        if (useCoroutinesScheduler) DefaultScheduler else CommonPool
    
    #Dispatcher.kt
    internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
        //定义IO 分发器
        //...
    }
    

    DefaultScheduler 也是个单例,内容不多,其功能实现还得继续往上看。
    ExperimentalCoroutineDispatcher 定义如下:

    #Dispatcher.kt
    open class ExperimentalCoroutineDispatcher(
        //核心线程数
        private val corePoolSize: Int,
        //最大线程个数
        private val maxPoolSize: Int,
        //空闲线程的存活时间
        private val idleWorkerKeepAliveNs: Long,
        //线程名前缀
        private val schedulerName: String = "CoroutineScheduler"
    ) : ExecutorCoroutineDispatcher() {
        constructor(
            //初始化参数的值
            corePoolSize: Int = CORE_POOL_SIZE,
            maxPoolSize: Int = MAX_POOL_SIZE,
            schedulerName: String = DEFAULT_SCHEDULER_NAME
        ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
        
        //真正的线程池实现
        private var coroutineScheduler = createScheduler()
        //分发
        override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
            try {
                //分发实现
                coroutineScheduler.dispatch(block)
            } catch (e: RejectedExecutionException) {
                //...
            }
    }
    
    //真正的线程池实现为:CoroutineScheduler
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    

    查看CoroutineScheduler.dispatch()函数:

    //block 为DispatchedContinuation
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        //构建Task对象,block 本身就是Task类型
        val task = createTask(block, taskContext)
        //当前线程是否是Worker类型,也就是说当前线程是否是线程池内的线程
        val currentWorker = currentWorker()//①
        //如果是,则尝试提交任务到本地队列,否则返回任务本身
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)//②
        if (notAdded != null) {
            //如果没有提交到本地队列,则提交到全局队列 
            if (!addToGlobalQueue(notAdded)) {//③
                //添加队列失败则抛出异常
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        //是否需要跳过唤醒线程,主要用在IO分发器
        val skipUnpark = tailDispatch && currentWorker != null
        if (task.mode == TASK_NON_BLOCKING) {//④
            if (skipUnpark) return
            //非阻塞任务,唤醒cpu 线程
            signalCpuWork()//⑤
        } else {
            //阻塞任务,唤醒blocking 线程
            signalBlockingWork(skipUnpark = skipUnpark)//⑥
        }
    }
    

    这函数是分发核心,注释里标明了6个点,现在一一阐述:

        private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
    

    Worker 本身是继承自Thread 的,因此Worker 是线程类,代表线程池里的线程。通过判断是否是Worker类型来确认当前线程是否属于线程池内的线程。

    private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
        //Worker 为空,直接返回任务本身
        if (this == null) return task
        //非阻塞的任务,则直接返回
        if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
            return task
        }
        //表示本地队列里存有任务了
        mayHaveLocalTasks = true
        //加入到本地队列里
        //localQueue 为Worker的成员变量
        return localQueue.add(task, fair = tailDispatch)
    }
    


    若是②没有成功加入到本地队列里,则尝试加入到全局队列里:

    private fun addToGlobalQueue(task: Task): Boolean {
        return if (task.isBlocking) {
            //加入到阻塞队列
            globalBlockingQueue.addLast(task)
        } else {
            //加入到cpu队列
            globalCpuQueue.addLast(task)
        }
    }
    

    结合②③分析,目前为止,出现了三个队列:


    image.png


    主要用于判断任务是阻塞还是非阻塞的,这在任务构造的时候就已经指定,若是使用Dispatchers.Default 分发器,那么构造的任务是非阻塞的,而使用Dispatchers.IO,则构造的任务是阻塞的。


    ⑤⑥ 是针对阻塞与否进行不同的处理。

    fun signalCpuWork() {
        //尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程
        if (tryUnpark()) return
        //若唤醒不成功,则需要尝试创建线程
        if (tryCreateWorker()) return
        //再试一次,边界条件
        tryUnpark()
    }
    

    tryUnpark()函数主要作用是从栈里取出挂起的线程(Worker),入栈的的时机是当线程没有任务可以处理时进行挂起,此时会记录在栈里。
    重点是tryCreateWorker()函数:

    private fun tryCreateWorker(state: Long = controlState.value): Boolean {
        //获取当前已经创建的线程数
        val created = createdWorkers(state)
        //获取当前阻塞的任务数
        val blocking = blockingTasks(state)
        //已创建的线程数-阻塞的任务数=非阻塞的线程数
        //coerceAtLeast(0) 表示结果至少是0
        val cpuWorkers = (created - blocking).coerceAtLeast(0)
        //如果非阻塞数小于核心线程数
        if (cpuWorkers < corePoolSize) {
            //创建线程
            val newCpuWorkers = createNewWorker()
            //如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程
            //目的是为了方便"偷"任务...
            if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
            //创建成功
            if (newCpuWorkers > 0) return true
        }
        return false
    }
    

    创建新线程为什么与阻塞任务的多少关联呢?
    简单举个例子:

    • 现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。
    • 只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。

    再看createNewWorker() 是如何创建新的线程(Worker)的。

    private fun createNewWorker(): Int {
        //workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问
        synchronized(workers) {
            if (isTerminated) return -1
            val state = controlState.value
            //获取已创建的线程数
            val created = createdWorkers(state)
            //阻塞的任务数
            val blocking = blockingTasks(state)
            //非阻塞的线程数
            val cpuWorkers = (created - blocking).coerceAtLeast(0)
            //非阻塞的线程数不能超过核心线程数
            if (cpuWorkers >= corePoolSize) return 0
            //已创建的线程数不能大于最大线程数
            if (created >= maxPoolSize) return 0
            val newIndex = createdWorkers + 1
            require(newIndex > 0 && workers[newIndex] == null)
            //构造线程
            val worker = Worker(newIndex)
            //记录到数组里
            workers[newIndex] = worker
            //记录创建的线程数
            require(newIndex == incrementCreatedWorkers())
            //开启线程
            worker.start()
            //当前非阻塞线程数
            return cpuWorkers + 1
        }
    }
    


    signalBlockingWork()函数调用时会记录阻塞的任务数,其它与signalCpuWork 一致。

    至此,Dispatchers.Default 任务分发流程已经结束,其重点:

    • 构造任务,添加到队列里(三个队列中选一个)。
    • 唤醒挂起的线程或是创建新的线程。

    任务执行

    既然任务都提交到队列了,该线程出场执行任务了。

    internal inner class Worker private constructor() : Thread() {}
    

    Worker 创建并启动后,将会执行run()函数:

    override fun run() = runWorker()
    private fun runWorker() {
        var rescanned = false
        //一直查找,除非worker终止了
        while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
            //从队列里寻找任务
            //mayHaveLocalTasks:本地队列里是否有任务
            val task = findTask(mayHaveLocalTasks) //①
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                //任务获取到后,执行任务
                executeTask(task)//②
                //任务执行完毕,继续循环查找任务
                continue
            } else {
                mayHaveLocalTasks = false
            }
            if (minDelayUntilStealableTaskNs != 0L) {
                //延迟一会儿,再去偷
                if (!rescanned) {
                    rescanned = true
                } else {
                    //挂起一段时间
                }
                continue
            }
            //尝试挂起
            tryPark()//③
        }
        //释放token
        tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
    }
    

    同样的,标注了3个重点,一一分析之。


    findTask()顾名思义:找任务。
    传入的参数表示是否扫描本地队列,若是之前有提交任务到本地队列,则此处mayHaveLocalTasks = true。

    fun findTask(scanLocalQueue: Boolean): Task? {
        //尝试获取cpu 许可
        //若是拿到cpu 许可,则可以执行任何任务
        if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
        //拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取
        val task = if (scanLocalQueue) {
            localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
        } else {
            globalBlockingQueue.removeFirstOrNull()
        }
        //都拿不到,则偷别人的
        return task ?: trySteal(blockingOnly = true)
    }
    
    private fun findAnyTask(scanLocalQueue: Boolean): Task? {
        if (scanLocalQueue) {
            //可以从本地队列找
            val globalFirst = nextInt(2 * corePoolSize) == 0
            if (globalFirst) pollGlobalQueues()?.let { return it }
            localQueue.poll()?.let { return it }
            if (!globalFirst) pollGlobalQueues()?.let { return it }
        } else {
            //从全局队列找
            pollGlobalQueues()?.let { return it }
        }
        //偷别人的
        return trySteal(blockingOnly = false)
    }
    

    此处解释一下获取cpu 许可的含义:

    它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。

    当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里是否有可以执行的任务,若是有就可以偷过来用。
    看看如何偷的:

    private fun trySteal(blockingOnly: Boolean): Task? {
        //自己本地没有才能偷
        kotlinx.coroutines.assert { localQueue.size == 0 }
        //所有的已创建的workers个数
        val created = createdWorkers
        //遍历workers数组
        repeat(created) {
            ++currentIndex
            if (currentIndex > created) currentIndex = 1
            val worker = workers[currentIndex]
            if (worker !== null && worker !== this) {
                //从别的worker里的本地队列偷到自己的本地队列
                val stealResult = if (blockingOnly) {
                    localQueue.tryStealBlockingFrom(victim = worker.localQueue)
                } else {
                    localQueue.tryStealFrom(victim = worker.localQueue)
                }
                //偷成功,则取出任务
                if (stealResult == TASK_STOLEN) {
                    return localQueue.poll()
                } else if (stealResult > 0) {
                    minDelay = min(minDelay, stealResult)
                }
            }
        }
        //...没偷到
        return null
    }
    

    实际上偷的本质是:

    从别人的本队队列里取出任务放到自己的本地队列,最后取出任务返回。


    拿到任务后,就开始执行任务。

    private fun executeTask(task: Task) {
        //模式:阻塞/非阻塞
        val taskMode = task.mode
        idleReset(taskMode)
        //当前任务是非阻塞任务,则尝试释放cpu token,并执行signalCpuWork
        beforeTask(taskMode)
        //真正执行任务
        runSafely(task)
        //修改状态
        afterTask(taskMode)
    }
    fun runSafely(task: Task) {
        try {
            //task 其实就是DispatchedContinuation
            task.run()
        } catch (e: Throwable) {
            //..
        } finally {
            unTrackTask()
        }
    }
    

    此时线程正式执行任务了。


    若是线程没有找到任何任务执行,则尝试挂起。

    private fun tryPark() {
        //没有在挂起栈里
        if (!inStack()) {
            //将worker放入挂起栈里
            parkedWorkersStackPush(this)
            return
        }
        //...
        while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
            if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
            //...
            //真正挂起,并标记worker state 状态
            park()
        }
    }
    

    最后一步的park()里会修改state = WorkerState.TERMINATED,在最外层的循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。

    至此,任务执行流程结束,其重点:

    1. 从全局队列、本地队列里查找任务。
    2. 若是没找到,则尝试从别的Worker 本地队列里偷取任务。
    3. 1、2 能够找到任务则执行Runnable.run()函数,该函数里最终会执行协程体里的代码。
    4. 若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。

    结合任务分发与任务执行流程,有如下流程图:


    image.png

    3. Dispatchers.IO 分发流程详解

    Dispatchers.IO 定义

    internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
        //创建LimitingDispatcher
        val IO: CoroutineDispatcher = LimitingDispatcher(
            this,
            systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
            "Dispatchers.IO",
            TASK_PROBABLY_BLOCKING
        )
        //...
    }
    

    Dispatchers.IO 作为DefaultScheduler 里的成员变量,并且它的分发器使用的是DefaultScheduler 本身。
    构造函数里指明了并行的数量限制,以及它属于TASK_PROBABLY_BLOCKING(阻塞任务)。

    任务分发

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            //记录在等待执行的任务
            val inFlight = inFlightTasks.incrementAndGet()
            
            //如果小于并行数
            if (inFlight <= parallelism) {
                //直接分发 dispatcher= DefaultScheduler
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            //等待执行的任务超过并行数,则加入到队列里
            queue.add(taskToSchedule)
            
            //碰运气,看是否有任务释放了
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }
            //若释放了,则取出队列里的任务执行
            taskToSchedule = queue.poll() ?: return
        }
    }
    

    可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。
    只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:

    当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。

    那它什么时候取出来呢?
    当任务执行完毕,也就是DispatchedTask.run()函数执行完毕后会调用:
    taskContext.afterTask(),来看它的实现:

    override fun afterTask() {
        //从队列里取出
        var next = queue.poll()
        if (next != null) {
            //继续分发
            dispatcher.dispatchWithContext(next, this, true)
            return
        }
    
        inFlightTasks.decrementAndGet()
        //...
    }
    

    举个简单例子:

    假设现在最大的并行数是64,线程池分配了64个线程执行IO任务,当第65个任务到来之时,因为超出了64,因此会放入队列里。当64个任务有某个任务执行完毕后,会从队列里取出第65个任务进行分发。

    这样做的目的是什么呢?

    为了限制突然间创建了许多IO线程,浪费资源,因此在线程池之外再加了一层防护,多出的任务先进入缓冲队列。

    4. 与Java 线程池比对

    使用过Java 线程池的小伙伴可能会知道,Java 线程池与Kotlin协程池 本质上都是:"池化技术的体现”
    它们的优势:

    1. 减少线程频繁开启/关闭的资源消耗。
    2. 及时响应并执行任务。
    3. 较好地管控/监控 应用内的线程使用。

    Java 线程池原理:

    1. 核心线程+队列+非核心线程。
    2. 首先使用核心线程执行任务,若是核心线程个数已满,则将任务加入到队列里,核心线程从队列里取出任务执行,若是队列已满,则再开启非核心线程执行任务。

    更详细的Java 线程池原理与使用请移步:Java 线程池之必懂应用-原理篇(上)

    协程线程池原理:

    1. 全局队列(阻塞+非阻塞)+ 本地队列。
    2. IO 任务分发还有个缓存队列。
    3. 线程从队列里寻找任务(包括偷)并执行,若是使用IO 分发器,则超出限制的任务将会放到缓存队列里。

    两者区别:

    • Java 线程池开放API,比较灵活,调用者可以根据不同的需求组合不同形式的线程池,没有区分任务的特点(阻塞/非阻塞)。
    • 协程线程池专供协程使用,区分任务特点,进而进行更加合理的调度。

    5. 协程到底在哪个线程执行?

    回到我们上篇末尾的问题:

        fun launch3() {
            GlobalScope.launch(Dispatchers.IO) {
                println("1>>>${Thread.currentThread()}")
                withContext(Dispatchers.Default) {
                    println("2>>>${Thread.currentThread()}")
                    delay(2000)
                    println("3>>>${Thread.currentThread()}")
                }
                println("4>>>${Thread.currentThread()}")
            }
        }
    

    理解了线程池原理,答案就呼之欲出了。
    1、4 可能不在同一线程。
    2、3 可能不在同一线程。
    1、2 可能在同一线程。

    看到这结果,你可能会觉得:废话!
    容我解释:因为线程池本身的调度侧重于执行任务,而非使用哪个特定的线程执行,因此具体分派到哪个线程执行需要看哪个线程刚好拿到了任务。

    下篇将分析协程的取消与异常处理,敬请关注。

    本文基于Kotlin 1.5.3,文中完整Demo请点击

    您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

    1、Android各种Context的前世今生
    2、Android DecorView 必知必会
    3、Window/WindowManager 不可不知之事
    4、View Measure/Layout/Draw 真明白了
    5、Android事件分发全套服务
    6、Android invalidate/postInvalidate/requestLayout 彻底厘清
    7、Android Window 如何确定大小/onMeasure()多次执行原因
    8、Android事件驱动Handler-Message-Looper解析
    9、Android 键盘一招搞定
    10、Android 各种坐标彻底明了
    11、Android Activity/Window/View 的background
    12、Android Activity创建到View的显示过
    13、Android IPC 系列
    14、Android 存储系列
    15、Java 并发系列不再疑惑
    16、Java 线程池系列
    17、Android Jetpack 前置基础系列
    18、Android Jetpack 易懂易学系列
    19、Kotlin 轻松入门系列
    20、Kotlin 协程系列全面解读

    相关文章

      网友评论

        本文标题:Kotlin 协程之线程池探索之旅(与Java线程池PK)

        本文链接:https://www.haomeiwen.com/subject/qeatbrtx.html