Android 多线程之线程池(二)

作者: JingChen_ | 来源:发表于2020-12-26 12:32 被阅读0次

    一 线程池中的一些重要概念

    Worker / workers

    Worker类是ThreadPoolExecutor类的一个内部类,也是线程池管理操作线程的核心所在。每一个worker都对应着一个thread,所以在不混淆的情况下,可以把worker理解为工作线程。

    ThreadPoolExecutor有一个名为workers的成员变量,它保存了这个线程池所有存活的worker对象。

    workQueue

    workQueue是线程池内部用来保存待执行任务的队列。它是一个BlockingQueue<Runnable>类型的变量,在没有任务的时候,它的poll()方法会阻塞。

    在一个允许超时的worker执行完任务之后,会调用workQueue.poll()取出下一个任务执行。如果没有任务,则会在这里阻塞;当阻塞时间达到超时时间后,这个工作线程会退出并销毁。

    一些简单的线程池使用方法,可以看看这篇文章Android 多线程之线程池(一)

    二 源码分析

    ctl

    ThreadPoolExecutor通过一个原子整型ctl来保存线程池的两个重要字段,workerCount和runState。workerCount即线程池工作线程的数量,而runState代表了线程池当前的状态(如:运行中、关闭、终止)。通过位运算,可以从ctl得到workerCount和runState的值,反之也可以通过workerCount和runState组合得到ctl。

    //ThreadPoolExecutor.java
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    execute(runnable)

    //ThreadPoolExecutor.java
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    当线程池执行一个任务会调这个方法进来,回想下之前讲过线程池的工作流程是怎么样的Android 多线程之线程池(一),再来看看源码会加深印象。

    分析:
    1、先判断任务是否为空,如果为空直接抛异常。
    2、如果当前运行的线程小于核心线程数,那么就去addWorker(command, true),这个方式返回一个Boolen值,意思是创建一个worker是否成功,第一个参数是任务,第二个参数是创建的worker的线程是否为核心线程。
    3、如果当前线程池是运作的,并且这个任务workQueue.offer(command)添加进队列,那么继续往下走,如果添加进队列失败,则去addWorker(command, false)创建一个普通线程并把任务对给他执行,如果创建失败,则拒绝这个任务。
    4、如果3的条件重新取到ctl,如果这个线程池不是运作的,并且这个任务已经在任务队列移除了,那么直接拒绝;如果上述条件不成立则判断workerCountOf(recheck) == 0 线程池没有工作线程的时候,则去创建普通线程的worker。


    线程池工作原理.png

    addWorker(Runnable firstTask, boolean core)

    //ThreadPoolExecutor.java
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    先去判断线程池的状态if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))return false;
    直接返回false有以下3种情况:
    1、线程池状态为STOP、TIDYING、TERMINATED
    2、线程池状态不是running状态,并且firstTask不为空
    3、线程池状态不是running状态,并且工作队列为空
    当返回false时则进入拒绝策略

    之后去判断

    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    

    addWorker的core的值,如果是核心线程,当前的workerCountOf大于corePoolSize则返回false;如果是非核心线程,当前的workerCountOf大于maximumPoolSize则返回false,当返回false时则进入拒绝策略

    如果上述条件满足,compareAndIncrementWorkerCount(c)表示workerCountOf+1

    接下来就是去创建一个worker了,相当于创建一个工作线程,之后把work添加到队列里面workers.add(w);如果workers.size() > largestPoolSize 当前的工作线程(包含未启动)大于最大工作线程,那么最大工作线程设置为队列的工作线程

      if (workerAdded) {
           t.start();
           workerStarted = true;
      }
    

    执行work对象里面的Thread

    ThreadPoolExecutor.Worker(Runnable firstTask)、runWorker(Worker)

    //ThreadPoolExecutor.java
    Worker(Runnable firstTask) {
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
       runWorker(this);
    }
    

    刚才有提及到worker可以理解为工作线程,原因是worker.thread是工作线程,而worker.firstTask为该线程处理的任务

    当Worker的线程开始运行之后,会调用其run()方法:runWorker(this)

    // 省略一大部分
        final void runWorker(Worker w) {
            Runnable task = w.firstTask;
            while (task != null || (task = getTask()) != null) {
                try {
                    task.run();
                } catch (Exception x) {
                } 
            }
        }
    
        // 省略一大部分
        private Runnable getTask() {
            for (;;) {
                if (/*无法获取任务*/) {
                    return null;
                }
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                } catch (InterruptedException retry) {
                }
            }
        }
    

    为了便于观看,只留了核心的几行,如果传入的task不为null,则执行相应的run方法;当传入的task为null,则会从getTask()方法中获取runnable对象

    try {
          Runnable r = timed ?
              workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
              workQueue.take();
          if (r != null)
              return r;
        } catch (InterruptedException retry) { }
    

    这里面通过timed变量判断是使用poll还是task
    poll:如果有值,则立马返回。否则等待一段时间,该时间内阻塞,时间结束后,队列还是没有元素则返回null。
    task:如果有值,则立马返回。否则一直阻塞。

    什么时候返回null,不让worker继续存活了呢?
    1、线程池被shutdown,并且任务队列空了;
    2、线程池超容量;
    3、超时;

    用一张图总结

    线程池原理.jpg

    End

    相关文章

      网友评论

        本文标题:Android 多线程之线程池(二)

        本文链接:https://www.haomeiwen.com/subject/mmawnktx.html