美文网首页三个JAVA臭皮匠程序员
并发编程之ThreadPoolExecutor(三)

并发编程之ThreadPoolExecutor(三)

作者: 后厂村老司机 | 来源:发表于2018-05-24 21:30 被阅读68次

前言:

ThreadPoolExecutor类是Executor框架的核心,负责创建线程池,执行线程池任务,下图为ThreadPoolExecutor整体架构!

1:ThreadPoolExecutor类:

这个类我们常用来创建线程池,我们来深八一下底层到底是怎么实现的。要想了解此类的实现我们首先需要了解一下ThreadPoolExecutor,而且实际上《阿里编程规约》里面要求我们用ThreadPoolExecutor而不是Executors类来创建线程池以做到更精确的配置。

ThreadPoolExecutor类的成员变量:

    //高三位表示状态,低29位表示有效线程数量,所以线程池最多可容纳线程为2的29次方-1

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;

    //线程数量的最大比特位数

    private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

    //线程池5种状态

    private static final int RUNNING    = -1 << COUNT_BITS;

    private static final int SHUTDOWN  =  0 << COUNT_BITS;

    private static final int STOP      =  1 << COUNT_BITS;

    private static final int TIDYING    =  2 << COUNT_BITS;

    private static final int TERMINATED =  3 << COUNT_BITS;

    // 打包线程状态和线程数量,以及解包还原两个数值

    private static int runStateOf(int c)    { return c & ~CAPACITY; }

    private static int workerCountOf(int c)  { return c & CAPACITY; }

    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //阻塞队列用于存放线程执行的任务

    private final BlockingQueue<Runnable> workQueue;

    //用于保存线程池里的线程,访问该Set需要mainLock锁

    private final HashSet workers = new HashSet();

    //用于创建线程的线程工厂,该工厂调用addWorker生产线程

    private volatile ThreadFactory threadFactory;

    //线程池保存的最小存活线程数量

    private volatile int corePoolSize;

    //线程池最大存活的线程数量

    private volatile int maximumPoolSize;

    //线程池饱和或者线程池关闭时的处理策略,默认为拒绝接受任务

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

两个关键字段:workerCount和runState,前者表示当前线程池里有效线程数量,后者表示当前线程池的状态。其他字段注解里已经说明。我们来重点关注下worker,即真正干活的线程长什么样:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

        private static final long serialVersionUID = 6138294804551838833L;

        //这里的Thread就是真正干活儿的工人了

        final Thread thread;                             

        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {

            setState(-1);

            this.firstTask = firstTask;

            this.thread = getThreadFactory().newThread(this);

        }

       //调用Thread里面Run方法的方法

        public void run() {

            runWorker(this);

        }

        // 0代表非锁状态,1代表锁定状态,这个在熟悉不过了,AQS框架里的资源

        protected boolean isHeldExclusively() {

            return getState() != 0;

        }

        protected boolean tryAcquire(int unused) {

            if (compareAndSetState(0, 1)) {

                setExclusiveOwnerThread(Thread.currentThread());

                return true;

            }

            return false;

        }

        protected boolean tryRelease(int unused) {

            setExclusiveOwnerThread(null);

            setState(0);

            return true;

        }

        public void lock()        { acquire(1); }

        public boolean tryLock()  { return tryAcquire(1); }

        public void unlock()      { release(1); }

        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {

            Thread t;

            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

                try {

                    t.interrupt();

                } catch (SecurityException ignore) {

                }

            }

        }

    }

上述代码就是Worker类的全貌了,他实际上是AQS类的儿子,里面自定义实现了lock和unlock方法,设置独占AQS的state资源。最重要的就是里面组合了Thread,该Worker的Thread有ThreadFactory工厂生产,并通过run方法实现线程的执行。

认识了这些worker了我们就得考虑怎么把这些worker放到池子里:

private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                  firstTask == null &&

                  ! workQueue.isEmpty()))

                return false;

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

        //我是分割线,我的上面是判断当前线程池是否满足添加线程的条件,不满足当然加不了喽

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive())

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

private void addWorkerFailed(Worker w) {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (w != null)

                workers.remove(w);

            decrementWorkerCount();

            tryTerminate();

        } finally {

            mainLock.unlock();

        }

    }

不要被上面冗长的代码吓坏了,分割线以上是判断当前线程池的状态是否满足添加新线程的条件。下面才是真正添加线程的代码,而且下面的代码也很好理解:首先调用Worker构造方法利用其内部的ThreadFactory创建一个线程并把任务塞给这个新线程;然后把新线程的模型worker加入到内部的worker的Set中、改变线程池的一些属性,由于这个过程不是线程安全的(workers实际是HashSet)所以采用了锁;最后调用这个线程的start方法真正启动该线程,线程启动肯定是执行Runnable的run方法啊,run 方法在哪?好好看看Worker类(实现了Runnable)。如果添加失败了没关系,还是锁定操作worker的set集合。

好了,上面就是我们的线程池模型,现在工人也有了,工人也被我们添加到池子里去了,下面应该让工人干活儿了,工人干活儿的方法是:

final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) ||

                    (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

DougLea真是烦人呐,又搞了这么长的代码,代码随长,还是很清晰的。线程start之后执行了Worker类中的run方法,而run方法内部实际上调用的就是runWorker方法。该方法的关键在于while循环,循环条件是firstTask或者从BlockingQueue中取的task不为空的时候执行,该方法给我们留了两个钩子,即beforeExecute和afterExecute,如果我们实现一个子类可以实现这两个方法给自己定制。真正执行的部分在于task.run()。

上面代码我们关注getTask()方法:

private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

            int wc = workerCountOf(c);

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))

                && (wc > 1 || workQueue.isEmpty())) {

                if (compareAndDecrementWorkerCount(c))

                    return null;

                continue;

            }

            try {

                Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

这个方法的关键在于这句话Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();这句话就是阻塞队列取元素的方法,所以该方法是线程安全的。

OK,上面就是ThreadPoolExecutor的具体实现,然而该类对外部暴露的api最常用的则是构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

构造方法分为很多中,每种可以实现不同的线程池,《阿里巴巴编程规约》里要求我们直接用构造方法定制一定数目的线程池。

OK,以上我们解决了构造函数的问题,现在我们看一下线程池是怎么执行的:

public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        *如果池子里线程数量小于最小线程数,那么新起一个线程并把command当作第                一个任务,

        *如果任务成功加入队列中,我们仍需要检测是否有线程加入workers里了

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

以上代码最重要的部分就是addworker,实际上首先保证有足够线程,如果没有就创建线程执行任务(不是创建core个线程哦)如果有就把任务放到BlockingQueue中等待线程去拿任务执行。

最后看看线程池的关闭:

public void shutdown() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            advanceRunState(SHUTDOWN);

            //中断线程池里没有执行任务并且等待任务的线程,调用的是t.interupt方法

            interruptIdleWorkers();

            //给定时线程池留的钩子

            onShutdown(); // hook for ScheduledThreadPoolExecutor

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

    }

public ListshutdownNow() { List tasks;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            advanceRunState(STOP);

            //直接中断所有线程,调用的是interrupt方法

            interruptWorkers();

            //把没执行完的任务放到List里面返回

            tasks = drainQueue();

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

        return tasks;

    }

两个关闭线程池方法略有区别,shutdown方法只改变线程池状态为SHUTDOWN,这个时候线程池拒绝接受任务但是会执行完原有的任务。shutdownNow直接关闭线程池,即不接受任务也不执行原来的任务。停止线程调用的都是Thread.interruput方法!!!!

本篇实际上漏掉了AbstractExecutorService里的一些submit方法,没关系,我们在下一篇ScheduledThreadPoolExecutor中补回来!

相关文章

网友评论

    本文标题:并发编程之ThreadPoolExecutor(三)

    本文链接:https://www.haomeiwen.com/subject/nixejftx.html