美文网首页一些收藏奶牛刀知识点
全方位解析-Android中的线程池

全方位解析-Android中的线程池

作者: g小志 | 来源:发表于2022-03-20 16:21 被阅读0次

    笔记文章,没有废话,句句关键

    线程池的优点

    1. 重用线程池里的线程,避免创建和销毁线程所带来的性能开销
    2. 有效控制最大并发数,避免造成线程间抢占系统资源而造成阻塞
    3. 提高线程可管理性,可以统一进行分配,调优和监控的能力

    Android中的线程池

    复用Java中的Executor接口,具体实现类为ThreadPoolExecutor,它有以下几个参数

    参数 说明 注释
    corePoolSize 线程池中核心线程数量 一直存活,即使处于闲置状态
    maximumPoolSize 最大能创建的线程数量(非核心线程,包含核心线程个数) 达到这个值后,后续任务会阻塞
    keepAliveTime 非核心线程最大存活时间 当设置allowCoreThreadTimeOut=true 同样会做用于核心线程,但通常不会这么做
    unit keepAliveTime的时间单位 TimeUnit中的枚举(时间单位)
    workQueue 等待队列execute 方法提交的Runnable存储在其中 如果线程池中的线程数量大于等于corePoolSize的时候,把该任务放入等待队列
    threadFactory 线程创建工程厂(用来创建线程的) 默认使用Executors.defaultThreadFactory() 来创建线程,线程具有相同的NORM_PRIORITY优先级并且是非守护线程
    handler 线程池的饱和拒绝策略(不常用) 阻塞队列已且没有空闲的线程,此时继续提交任务,就需要采取一种策略处理该任务,默认会抛出异常

    常用与关键方法

    • void execute(Runnable run)//提交任务,交由线程池调度
    • void shutdown()//关闭线程池,等待任务执行完成
    • void shutdownNow()//关闭线程池,不等待任务执行完成
    • int getTaskCount()//返回线程池找中所有任务的数量 (已完成的任务+阻塞队列中的任务)
    • int getCompletedTaskCount()//返回线程池中已执行完成的任务数量 (已完成的任务)
    • int getPoolSize()//返回线程池中已创建线程数量
    • int getActiveCount()//返回当前正在运行的线程数量
    • void terminated() 线程池终止时执行的策略

    线程池的运行状态

    线程存在运行状态,如下:

    状态 说明
    NEW 初始状态,线程被新建,还没调用start方法
    RUNNABLE 运行状态,把"运行中"和"就绪"统称为运行状态
    BLOCKED 阻塞状态,表示线程阻塞于锁
    WAITING 等待状态,需要其他线程通知唤醒
    TIME_WAITING 超时等待状态,表示可以在指定的时间超时后自行返回
    TERMINATED 终止状态,表示当前线程已经执行完毕
    在这里插入图片描述

    根据线程的运行状态思想,我们来展开线程池的运行状态:

    状态 说明
    RUNNABLE 表示当前线程池,存在正在运行的线程
    SHUTDOWN 关闭线程池,不在执行新的任务,但会执行完线程池正在运行的任务,和添加到队列中的任务,对应shutDown()方法
    STOP 立即关闭线程池,打断正在运行的任务,且不再处理等待队列中已添加的任务,对应shutDownNow()方法
    TIDYING shutDown() / shutDownNow()后进入此状态,表示队列和线程池为空,之后进入TERMINATED状态
    TERMINATED 终止状态,表示当前线程已经执行完毕,并调用 terminated() 通知外界
    在这里插入图片描述

    安卓中常用的四种线程池

    Executors.newFixedThreadPool()

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    1. 线程数量固定的线程池
    2. 核心线程与非核心线程数量相等,意味着只有核心线程。
    3. keepAliveTime = 0L,即使线程池中的线程空闲,也不会被回收。除非调用shutDown()或shutDownNow()去关闭线程池。
    4. 可以更快响应外界的请求,且任务阻塞队列为无边界队列(LinkedBlockingQueue()链表结构的阻塞队列),意味着任务可以无限加入线程池

    Executors.newScheduledThreadPool()

       public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
      public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  10L, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    
    1. 核心线程数量固定,非核心线程是无限的,但非核心线程存活时间非常短(10毫秒),闲置后会被回收
    2. 适合执行定时任务和固定周期的重复任务
    3. DelayedWorkQueue()优先级队列(堆结构),会根据定时/延时时间进行排序,延时少,先执行

    Executors.newSingleThreadExecutor() 常用

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    1. 核心线程数为1的线程池。确保所有任务在同一线程执行,因此不用考虑线程同步问题
    2. 阻塞队列是无界的,可以一直接收任务 、

    Executors.newCachedThreadPool()

      public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    1. 没有核心线程,非核心线程无限,但执行完任务后,线程存活60秒
    2. SynchronousQueue() 是个特殊的队列,可以理解为无法存储元素的队列,因此线程池接收到任务就会创建线程去执行。当整个线程都处于空闲状态,60秒后,线程池中的线程都会因超时而被停止,不占用系统资源。
    3. 根据以上两点,此线程池适合执行耗时较少但频繁的任务执行

    线程池源码分析

    在分析源码前,先把线程池的工作流程图放出来。这样再去阅读源码会更加深刻

    在这里插入图片描述
    阅读源码的目的:
    • 了解线程池如何复用
    • 如何巧妙的使用线程池,根据需求去自定义线程池,以达到暂停/恢复,任务具有优先级,监控,分配,调优等
    • 学习线程池优秀的设计思想
    • 面试吹牛逼

    源码分析

    分析源码前看下ThreadPoolExecutor类的头注释:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
         *
         * In order to pack them into one int, we limit workerCount to
         * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. If this is ever an issue in
         * the future, the variable can be changed to be an AtomicLong,
         * and the shift/mask constants below adjusted. But until the need
         * arises, this code is a bit faster and simpler using an int.
         *
         * The workerCount is the number of workers that have been
         * permitted to start and not permitted to stop.  The value may be
         * transiently different from the actual number of live threads,
         * for example when a ThreadFactory fails to create a thread when
         * asked, and when exiting threads are still performing
         * bookkeeping before terminating. The user-visible pool size is
         * reported as the current size of the workers set.
         *
         * The runState provides the main lifecycle control, taking on values:
         *
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
         *
         * Threads waiting in awaitTermination() will return when the
         * state reaches TERMINATED.
         *
         * Detecting the transition from SHUTDOWN to TIDYING is less
         * straightforward than you'd like because the queue may become
         * empty after non-empty and vice versa during SHUTDOWN state, but
         * we can only terminate if, after seeing that it is empty, we see
         * that workerCount is 0 (which sometimes entails a recheck -- see
         * below).
         */
    

    这里面有几段比较重要 :

         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
         *
         * In order to pack them into one int, we limit workerCount to
         * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. If this is ever an issue in
         * the future, the variable can be changed to be an AtomicLong,
         * and the shift/mask constants below adjusted. But until the need
         * arises, this code is a bit faster and simpler using an int.
    

    线程池控制状态:clt是一个int类型的原子数,它表示两个概念,workerCount表示有效线程,笼统来讲,相当于线程池运行的线程个数,runState,表示线程池运行的状态,如RUNNING SHUTDOWN等等。
    将这两个字段打包成一个int,int为4字节,有32位,后29位表示线程池的数量,最大可为5个亿,如果不够可以使用AtomicLong代替。当然,运行线程池根本达不到这个数量。

         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
    

    五种状态的含义,上面提到过。

         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
    

    线程池状态间的变化。等于上面我们画的图:

    在这里插入图片描述
    结论:阅读源码可以适当看下类头说明,尤其是Android源码的类头。可以帮我们更好的理解源码

    从线程池入口进入,理解前面提到ctl是什么?

     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
                                    
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS; 
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    Integer.SIZE为32,因此 COUNT_BITS = Integer.SIZE - 3 = 29 CAPACITY表示线程池的容量:

    (1 << COUNT_BITS) - 1 ---> 100000000000000000000000000000(30位) -1 -->11111111111111111111111111111(29位)

    这代表int32位中,前3位表示线程池状态后面29位表示容量
    计算结果的值如下:

    在这里插入图片描述
    总结来说,状态值自上而下,数值越来越大

    AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 初始状态为RUNNING,线程数为0

    ctlOf(int rs, int wc) 用来获取int中的值,用来调用下面两个方法 :

    private static int runStateOf(int c) { return c & ~CAPACITY; } 获取线程池状态
    private static int workerCountOf(int c) { return c & CAPACITY; } 获取线程池中线程的有效线程数量

    计算方式如下:

    在这里插入图片描述

    理解上面的计算方式后,开始真正进入源码阅读

    ==入口函数:== Executor.execute()

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            int num=workerCountOf(c);
            //第一步
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //第二步
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }//第三步
            else if (!addWorker(command, false))
                reject(command);
        }
      // firstTask 待执行的任务 core 是否要启动核心线程
     private boolean addWorker(Runnable firstTask, boolean core) 
    

    注释翻译: 线程池的执行分3步

     if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
    • 如果小于corePoolSize核心线程数,直接创先新的线程,并将任务作为该线程的第一个任务(firstTask),因为是往新线程执行任务,肯定是第一个任务addWorker()方法的返回值,表示是否添加任务成功,core = true 启动核心线程 ,并直接return返回,不再向下执行
    //  if (workerCountOf(c) < corePoolSize)不满足,也就是运行线程大于等于核心线程,
    //  此时可能为核心线程达到最大,或核心与非核心共存
    //  那么继续判断,线程池为运行状态,同时加入队列是否成功,
      if (isRunning(c) && workQueue.offer(command)) {
                // 成功! 进行二次校验,因为此时线程池中的线程可能销毁
                // (比如非核心到达keepAliveTime指定存活时间而销毁)或某个线程调用shutDown()方法
                int recheck = ctl.get();
                // 如果不是RUNNING状态 ,删除刚才offer进队列的任务
                if (!isRunning(recheck) && remove(command))
                    // 执行拒绝策略 调用创建时的handle,执行 handler.rejectedExecution(command, this); ,默认抛出异常
                    reject(command);
                    // 如果线程池 没有正在运行的线程,那么这个线程可能调用了shutDown()方法,要关闭了,那么就加入个null任务,
                    //也就是不执行 新任务,去创建非核心线程 core = false 执行等待队列中的任务
                else if (workerCountOf(recheck) == 0)
                    // 添加个null任务
                    addWorker(null, false);
            }//(isRunning(c) && workQueue.offer(command))不满足,表示等待队列已满,添加任务失败,那么就创建非核心线程
            //如果创建失败。执行拒绝策略
             else if (!addWorker(command, false))
                reject(command);
    

    boolean addWorker(Runnable firstTask, boolean core) 方法逻辑:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry: //双层for循环 retry外层循环终止标识
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                // 如果rs >= SHUTDOWN不再添加新执行 后面的判断条件可以不关心
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取线程池中,正在执行任务的线程
                    int wc = workerCountOf(c);
                    // 大于容量,不可能,不用看, wc >= (core ? corePoolSize : maximumPoolSize 这个判断是
                    // core = true wc >= corePoolSize 核心线程到达上限 还要创建核心线程,不可以 返回false
                    // core = false wc >= maximumPoolSize 达到非核心(包含核心)线程上限 还要创建线程 返回false
                    if (wc >= CAPACITY || 
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                        //workerCount +1 
                    if (compareAndIncrementWorkerCount(c))
                       //返回外层循环 ,循环结束
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 创建 Worker包装我们的任务 firstTask  
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                         // rs < SHUTDOWN 表示是RUNNING   状态 
                         // (rs == SHUTDOWN && firstTask == null) 表示SHUTDOWN状态 且没有不再执行新的任务
                        //2种情况都会继续执行
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                                //上面2中情况 都会加入workers集合
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                                // 设置 workerAdded = true;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //workerAdded = true; 上面设置过了
                    if (workerAdded) {
                    // 启动线程 
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
           
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                //赋值 firstTask 并用getThreadFactory()得到我们创建时传入的线程工厂调用newThread(this);创建线程,并将自己作为Runnable传入
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    }
    

    Worker是个Runnable,上面的w = new Worker(firstTask); 方法其实就是对任务的包装 在构造方法中赋值 firstTask 并用getThreadFactory()得到我们传入的线程工厂调用newThread(this);创建线程,并将自己作为Runnable传入。当addWorker()方法调用 t.start() 就会执行Worker类中的run()方法。

    Worker类

     /** Delegates main run loop to outer runWorker. */
    public void run() {
                runWorker(this);
            }
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            //赋值firstTask给task
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //要执行的任务不为空直接执行,如果等于空调用getTask()去等待队列取出任务
                //对应了最开始 (workerCountOf(recheck) == 0) addWorker(null, false); 这段判断条件
                //之前提过 添加空任务的目的就是不再执行新任务,去执行等待队列的任务。因此getTask()得以执行 
                
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行任务前调用此时已经在线程中运行 ,因为这是在Worker中的run方法中
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        //执行我们传入的Runnable
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                        //执行任务后调用
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    getTask()

     private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                //allowCoreThreadTimeOut 表示非核心线程的超时时间keepAliveTime是否作用于核心线程,默认是false
                // 所以只看后面的判断。  wc > corePoolSize;表示大于核心线程 true
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                //wc > corePoolSize;表示大于核心线程 true 那么就是创建非核心线程,非核心线程取等待队列的任务有超时时间 
                //keepAliveTime时间内阻塞。取不到 继续向下执行 任务完成 非核心线程执行完销毁
                // wc <= corePoolSize; 小于或等于核心线程 workQueue.take();一直阻塞,当队列有任务时立马执行,这也是为什么核心线程一直存在的原因
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    总结:

    • 核心线程一直存在的原因的是workQueue.take();,非核心线程keepAliveTime时间后销毁的原因是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
    • 从以下代码可以看出。线程池中的任务达到corePoolSize线程数时,所有任务都会先添加到队列,当调用workQueue.offer(command)时,被 workQueue.take();阻塞的核心线程就会拿到任务,然后去执行,这很关键
    • if (isRunning(c) && workQueue.offer(command)) 里的条件当都都不满足时,什么都不会做,之后会往队列中添加元素。当核心线程有空间是getTask中take()阻塞的核心线程会立马取队列取出个任务执行
    • beforeExecute(wt, task);afterExecute(task, thrown);会在任务执行前后执行
    • 核心线程与非核心线程没有本质上的区别。只是在获取任务时,通过此判断boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;,非核心超时会结束,而非核心则会take()阻塞住
    int c = ctl.get();
            int num=workerCountOf(c);
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
    

    通过源码 得出前面提到的结论

    在这里插入图片描述

    实战

    根据学习到的源码,封装自己的线程池工具类,它有一下几个特点:

    /**
     * 1. 支持按任务的优先级去执行,
     * 2. 支持线程池暂停.恢复(批量文件下载,上传) ,
     * 3. 异步结果主动回调主线程
     * todo 线程池能力监控,耗时任务检测,定时,延迟,
     *
     * 根据源码分析 :当工作线程小于核心线程是不会进入队列
     * 当核心线程启动后 每次都会进入队列
     */
    object MyExecutor {
        private var hiExecutor: ThreadPoolExecutor
    
        //暂停线程表示:hint:这里为啥不用volatile修饰?
        private  var  isPause: Boolean = false
    
        private val lock: ReentrantLock = ReentrantLock()
    
        private val condition: Condition = lock.newCondition()
    
        //主线程回调能力的Handler
        private val handler: Handler = Handler(Looper.getMainLooper())
        private val seq = AtomicLong()
    
        /**
         * 初始化构建参数
         */
        init {
            //获取cpu数
            val cpuCount = Runtime.getRuntime().availableProcessors()
            //核心线程数为 cpu+1
            val corePoolSize = cpuCount + 1
            //非核心线程数
            val maxPoolSize = corePoolSize * 2 + 1
            //非核心线程超时销毁时间
            val keepAliveTime = 30L
            //超时时间单位 s
            val unit = TimeUnit.SECONDS
            //相当于默认实现 创建一个线程
            val threadFactory = ThreadFactory() {
                val thread = Thread(it)
                //hi-executor-0
                thread.name = "hi-executor-" + seq.getAndIncrement()
                return@ThreadFactory Thread(it)
            }
    
    
    
            val priorityBlockingQueue: PriorityBlockingQueue<out Runnable> = PriorityBlockingQueue()
            hiExecutor = object : ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                unit,
                priorityBlockingQueue as BlockingQueue<Runnable>,
                threadFactory
            ) {
                override fun beforeExecute(t: Thread?, r: Runnable?) {
                    priorityBlockingQueue.size
                    if (isPause) {
                        try {
                            //条件唤醒线程标准写法
                            lock.lock()
                            condition.await()
                        } finally {
                            lock.unlock()
                        }
    
                    }
                }
    
                override fun afterExecute(r: Runnable?, t: Throwable?) {
                    //监控线程池耗时任务,线程创建数量,正在运行的数量
                    HiLog.e("已执行完的任务的优先级是:" + (r as PriorityRunnable).priority.toString() + Thread.currentThread().name)
                    priorityBlockingQueue.size
                }
            }
        }
    
        /**
         * priority越小 优先级越高
         * @IntRange 优先级范围
         *  @JvmOverloads 重载方法 生成多种重载方法
         *  无返回结果 与正常线程池一样
         */
        @JvmOverloads
        fun execute(@IntRange(from = 0, to = 10) priority: Int = 0, runnable: Runnable) {
            hiExecutor.execute(PriorityRunnable(priority, runnable))
        }
    
    
        @JvmOverloads
        fun execute(@IntRange(from = 0, to = 10) priority: Int = 0, callable: Callable<*>) {
            val newFixedThreadPool = Executors.newCachedThreadPool()
    
            hiExecutor.execute(PriorityRunnable(priority, callable))
        }
    
        /**
         * 提供任务执行后返回结果
         * 并回调主线程的能力
         *
         * <T> 返回结果类型
         */
        abstract class Callable<T> : Runnable {
    
            override fun run() {
                handler.post {
                    onPrepare()
                }
                //真正在线程中执行
                val t = onBackground()
    
                handler.post {
                    onCompleted(t)
                }
            }
    
            /**
             * UI线程执行
             */
            open fun onPrepare() {
                //预处理 加载动画
            }
    
            /**
             * 线程池中执行
             */
            abstract fun onBackground(): T
    
            /**
             * UI线程执行
             * 执行完成
             */
            abstract fun onCompleted(t: T)
        }
    
        /**
         * 重写的意义在于 PriorityBlockingQueue 这个具有排序的堆结构 每个元素要有比较大小的能力
         * priority 线程优先级 runnable执行任务的
         */
        class PriorityRunnable(val priority: Int, private val runnable: Runnable) : Runnable,
            Comparable<PriorityRunnable> {
            override fun run() {
                runnable.run()
            }
    
            override fun compareTo(other: PriorityRunnable): Int {
                //倒序 priority越小 优先级越高
                return if (this.priority < other.priority) 1 else if (this.priority > other.priority) -1 else 0
            }
    
        }
    
        //可能多个线程调用此方法
        @Synchronized
        fun pause() {
            isPause = true
            HiLog.e("hiexecutor is paused")
    
        }
    
        @Synchronized
        fun resume() {
            isPause = false
            lock.lock()
            try {
                condition.signalAll()
            } finally {
                lock.unlock()
            }
            HiLog.e("hiexecutor is resume")
        }
    }
    

    测试代码:

        private void priorityExecutor() {
            for (int priority = 0; priority < 10; priority++) {
                int finalPriority = priority;
                HiExecutor.INSTANCE.execute(priority, () -> {
                    try {
                        Thread.sleep(1000 - finalPriority * 100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    

    利用源码分析后,我们可以起送封装自己的线程池,它具有以下几种能力:

    1. 支持按任务的优先级去执行,
    2. 支持线程池暂停.恢复(批量文件下载,上传) ,
    3. 异步结果主动回调主线程
    4. 线程池能力监控,耗时任务检测,定时,延迟,

    但执行中有2个问题:

    • 明明是使用优先队列,但为什么第一次的时候,任务优先级却是无序的,第二次就好了?
    • 使用pause()方法,线程还在执行?

    对于问题1。

    在这里插入图片描述

    首先线程池的调度是,系统Cpu利用时间碎片随机去调度的。我们的设置的只能尽可能的去满足,按照优先级去执行,但不能保证。更重要的是。通过源码得知。当线程池中,核心线程数未到最大值时(测试例子中是5),是不会加入到队列的,因此也就不会排序。当第二次执行任务,线程池就会先加入队列。然后被take()阻塞的队列取到任务,然后执行,此时就有了优先级。 解决:可以在应用启动白屏页启动线程。异步去初始化第三方库,或其他不需要优先级的任务。这样之后的任务就都具有优先级了

    对于问题2:

    在这里插入图片描述
    通过源码分析:
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
        
    我们重写的方法
    override fun beforeExecute(t: Thread?, r: Runnable?) {
                    priorityBlockingQueue.size
                    if (isPause) {
                        try {
                            //条件唤醒线程标准写法
                            lock.lock()
                            condition.await()
                        } finally {
                            lock.unlock()
                        }
    
                    }
                }
    

    beforeExecute(wt, task);中的方法,只能阻塞当前线程,和后续要执行的线程,已经在线程中开始执行的任务是无法暂停的。 这其实不是问题。只是种误解

    总结

    通过学习源码,我们的收获:

    • 学习了int字段的多重利用,试想下,如果不采用源码中的方式,我们至少要维护这几种字段:sunStatusrunningWorkCountmaxWorkCountshutDownWorkCount,还要保证他们的线程安全。这是种值得借鉴的优秀思想。ARouter中也用到类似思想(extars)
    • 清楚线程池的状态和工作流程,理解线程池如何复用与非核心线程的销毁
    • 找到插手线程池的思路,扩展自己的线程池
    • 通过学习源码,还解决了2个看似不合理,但却在源码中有答案的问题。

    相关文章

      网友评论

        本文标题:全方位解析-Android中的线程池

        本文链接:https://www.haomeiwen.com/subject/oxqadrtx.html