美文网首页
多线程编程ThreadPoolExecutor代码分析

多线程编程ThreadPoolExecutor代码分析

作者: xingtangcunliu | 来源:发表于2016-11-11 15:25 被阅读0次

知识整理,是对知识点的梳理,以前没什么感觉,但多写了几篇博客之后发现,我对某个知识点不是完全懂了,我的思路不清晰,在大脑中没有完整的知识体系,还有也发现自己的文本表达能力较弱,对某个知识的理解混乱没调理。所以开始加大写博客的力度,锻炼自己的能力以及分享,现在就是,我有个观点就是你把代码写出来不代表你就会了,你必须能表达出来,因为你会了是别人对你的认同,而不是自己对自己的认同。

我对网络请求进行封装以及研究AsyncTask 时,仔细研究过线程池ThreadPoolExecutor的源码,所以今天把ThreadPoolExecutor讲清。ThreadPoolExecutor就是起到管理线程的作用,就是保证线程的重用,防止一直重复创建线程,消耗大量资源,因为线程的创建时很消耗资源的。

1、ThreadPoolExecutor初始化

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        //核心线程数
        this.corePoolSize = corePoolSize;
        //最大线程数
        this.maximumPoolSize = maximumPoolSize;
        //请求等待队列
        this.workQueue = workQueue;
        //线程存活时间
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        //线程创建工厂类
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

可以看出ThreadPoolExecutor主要包括corePoolSize 、maximumPoolSize 、workQueue 、keepAliveTime 、threadFactory 属性。以及一个很重要的属性ctl,AtomicInteger类型,原子数,保证原子操作,下面是对该属性注释的部分解释。

 /**
     * 整个线程池的控制状态,包含了两个属性:有效线程的数量、线程池的状态(runState)。
     *   workerCount,有效线程的数量
     *   runState,   线程池的状态
     *
     * ctl 包含32位数据,低29位存线程数,高3位存runState,这样runState有5个值:
     *
     *   RUNNING:  接受新任务,处理任务队列中的任务
     *   SHUTDOWN: 不接受新任务,处理任务队列中的任务
     *   STOP:    不接受新任务,不处理任务队列中的任务
     *   TIDYING:  所有任务完成,线程数为0,然后执行terminated()
     *   TERMINATED: terminated() 已经完成
     * 具体值:
     * RUNNING:-536870912
     * SHUTDOWN:0
     * STOP:536870912
     * TIDYING:1073741824
     * TERMINATED:1610612736
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // Packing and unpacking ctl
   //获取runState值,线程池的运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取workerCount值,有效线程的数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //将运行状态和线程池数组合成新的ctl值。
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    //是否运行中
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

corePoolSize :

这些线程一直存活,就是只要当前线程数小于corePoolSize ,那么就会添加,而且就算当前没有任务,只要线程数不大于当前corePoolSize ,那么这些线程就会一直存活,当然如果调用allowCoreThreadTimeOut(true)方法,那么这些线程在没有任务的时候也会释放掉。

maximumPoolSize :

最大线程数,顾名思义就算当前线程池所持有的最多线程,如果超出这个数就会报异常。

workQueue :

请求等待队列,当当前线程数不小于corePoolSize 时,而workQueue 队列没有满,那么这时就会把请求放到workQueue 队列中,等待执行。

keepAliveTime :

线程等待存活时间,也就是当线程闲置下来时等待下次任务最长时间,默认情况下,这时对核心线程之外的线程的处理,也就是大于corePoolSize 的线程等待时间,当调用allowCoreThreadTimeOut(true)方法,如果当前没有任务,核心线程也会有存活时间。

threadFactory :

线程创建工厂类,创建线程,一般就是调用new Thread()创造线程,当然也可以自己继承Thread,添加自己需要的属性以及操作。

2、添加任务以及执行

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1.如果线程数小于corePoolSize ,创建一个新的线程执行任务
         *
         * 2. 如果当前线程数不小于corePoolSize ,尝试把任务添加到等待队列中。
         *
         * 3.如果不能把任务添加到请求队列中,则尝试创建一个新的线程,也就是线程数大于corePoolSize 小于maximumPoolSize 的情况。
         */
        int c = ctl.get();
        /**
        * 当前有效线程数小于核心线程数,尝试添加任务
        **/
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
       /**
        * 如果线程池处于RUNNING状态则尝试把请求加入等待队列
        **/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       /**
        * 如果请求加入等待队列失败,则尝试创建新线程
        **/
        else if (!addWorker(command, false))
            reject(command);
    }

在这里一直调用了addWorker方法,可见该方法的重要性。

/*
    /**
     * @param firstTask 等待执行的任务
     *
     * @param core 是否把任务添加到核心线程中执行 ,true用 核心线程数判断执行,false 用最大线程数判断执行
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            // 不添加firstTask条件
           1、线程池状态不能为RUNNING
            2、当线程池状态为SHUTDOWN 时,firstTask为null,并且workQueue不为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) { 
                // 获取当前线程数
                int wc = workerCountOf(c);
                //第一个添加不可能,看第二个,当core为true时,添加的是核心线程数中,,当core为false时,添加的是最大线程数中。如果当前线程数大于等于要求线程数(core为true,corePoolSize :core为false,maximumPoolSize)则不添加任务,
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //最新的线程数和addWorker一开始获取的值进行比较,看有没有被修改,如果修改过了则重新进行判断,这里用到了goto语句。就是跳到上面retry的地方,重新尝试的意思。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //和上面方法一样,只是这里判断线程池运行状态有没有改变。
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //把任务封装在Worker中,最后都会在Worker中执行,并创建一个新线程。
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //锁,保证线程同步
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //获取线程池状态
                    int rs = runStateOf(ctl.get());
                    // 如果线程池处于RUNNING状态执行添加任务操作,或线程池处于SHUTDOWN 状态,firstTask 为空。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                   //线程执行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

可以看出addWorker方法主要是生成新的线程,而线程的重用则在Worker类中实现。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** worker持有的线程 */
        final Thread thread;
        /** worker正在执行的任务 ,可能为null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * 创建Worker时会同时创建一个新线程.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
           //把Worker传递给新建的线程,当线程执行是会调用Worker的run方法。
            this.thread = getThreadFactory().newThread(this);
        }

        /** 线程执行时会调用该方法 */
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

最后执行runWorker方法:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //获取要执行的任务,getTask()是线程阻塞的,当然有判断条件,下面再说。也就是说只有只有getTask()获取任务为null时,线程就会结束。
            while (task != null || (task = getTask()) != null) {
                w.lock();
               //逻辑复杂,就是在在STOP以及大于STOP状态时尝试中断线程。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任务真正执行前操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

线程获取待执行任务方法,从上面的分析可以看出,这里才是线程重用的关键,所以下面分析getTask方法。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 当线程池处于STOP及以上状态时,线程数减一,该线程不使用。
           //当线程处于SHUTDOWN 状态时,并且workQueue请求队列为空,释放该线程。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取当前线程数
            int wc = workerCountOf(c);

            // 如果调用allowCoreThreadTimeOut方法设置为true,则所有线程都有超时时间。
           //如果当前线程数大于核心线程数则该线程有超时时间。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //poll方法为线程阻塞方法,keepAliveTime为阻塞最长时间,若超时则返回null,
                //take()方法没有超时时间,会一直获取。也就是说在这里不断获取任务,
              //也就是如果线程池处于RUNNING、SHUTDOWN状态时,只要等待队列不为空,那么线程就会一直执行。这也就是线程重用的原理。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

3、结束线程池

ThreadPoolExecutor有两个结束的方法shutdown、shutdownNow。shutdown是把线程池状态转为SHUTDOWN,这时等待队列中的任务可以继续执行;
shutdownNow方法是把线程池状态转为SHUTDOWN,这时等待队列中的任务不可以继续执行,只能执行已经执行的任务;

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //把线程池状态改为SHUTDOWN
            advanceRunState(SHUTDOWN);
           // 中断所有空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
           //把线程池状态改为STOP
            advanceRunState(STOP);
            // 中断所有空闲线程
            interruptWorkers();
            // 返回队列中还没有被执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

advanceRunState 改变线程池的状态

 //把线程池状态改为目标状态targetState
 private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

interruptIdleWorkers中断线程

 private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //线程没有被中断并且Worker 正在获取任务中,就是空闲中。线程中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

getTask方法中可以看出如果线程池处于STOP已经以上状态时不会继续获取任务,而是尝试中断线程,这也就是shutdown、shutdownNow的区别。我查找资料发现这些内容:
1、ReentrantLock.lockInterruptibly允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException。

然后我们进入Executors的生成方法,发现使用的是LinkedBlockingQueue类,而LinkedBlockingQueue的take()方法如下

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
       //调用了lockInterruptibly方法
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

所以当线程在获取任务阻塞时,如果该线程被调用了interupt方法,则该线程释放,所以说释放空闲线程。

相关文章

网友评论

      本文标题:多线程编程ThreadPoolExecutor代码分析

      本文链接:https://www.haomeiwen.com/subject/uixfpttx.html