美文网首页
java线程池源码分析

java线程池源码分析

作者: Gorden_Tam | 来源:发表于2019-02-28 12:05 被阅读0次

    一:简介
    ThreadPoolExecutor继承AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,ExecutorService接口主要包含以下方法:


    image.png

    submit方法返回一个future类型的对象,future表示任务的执行结果,可以调用future.get返回实际的结果。
    下面来看下ThreadPoolExecutor的主要变量:

    //一个阻塞队列用于存放任务
    private final BlockingQueue<Runnable> workQueue;
        //可重入锁
        private final ReentrantLock mainLock = new ReentrantLock();
        //存放任务的集合
        private final HashSet<Worker> workers = new HashSet<Worker>();
        //锁的状态
        private final Condition termination = mainLock.newCondition();
        //最大线程数量
        private int largestPoolSize;
    
        private long completedTaskCount;
        //线程工厂,用来生产线程
        private volatile ThreadFactory threadFactory;
        //
        private volatile RejectedExecutionHandler handler;
        private volatile long keepAliveTime;
        private volatile boolean allowCoreThreadTimeOut;
        //核心线程池大小
        private volatile int corePoolSize;
        //最大线程池大小
        private volatile int maximumPoolSize;
        //拒绝策略
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    

    还有一个原子变量AtomicInteger,其高三位用来存储线程池状态信息,低三位表示线程池中任务的数量

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    

    线程池一共分五种状态,从小到大依次为RUNNING,SHUTDOWN,STOP,TIDYING和TERMINATED。
    RUNNING:线程池接受新任务,对新添加的任务进行处理
    SHUTDOWN:线程池不接受新任务,但对已添加的任务进行处理
    STOP:线程池不接受新任务,不对添加的任务进行处理,并且会中断正在处理的任务
    TIDYING:当所有任务已终止,ctl低29位记录的任务数量为0,线程池会变成TIDYING状态,线程池变成TIDYING状态时会执行terminated方法,terminated方法在ThreadpoolExecutor中是空的,用户可以重载该函数在线程池处于TIDYING时做某些特定的处理。
    TERMINATED:线程池彻底终止。

    二.主要方法:
    通过线程池执行一个任务时,调用的是ExecutorService的submit方法,该方法定义在AbstractExecutorService中,如下:

        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    

    由此可见最终是调用execute方法来执行,如下为execute方法:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
           //如果任务数量小于核心线程池大小,则调用addworker方法,addworker方法会创建一个新线程来执行这个任务。该任务为这个线程的第一个任务。addworker方法会检查线程池的状态和worker的数量以防止线程池在不能添加线程的情况下添加线程。
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
        //
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    这里反复提到worker,到底什么时worker呢?worker

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                //rs >= SHUTDOWN:  如果线程池的状态为ruuning则允许线程加入,如果是shutdown及更之后的状态,则继续看看其他条件
                // ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()):  如果线程池当前状态不是running,但是满足当前状态为shutdown,提交的任务又是null,而且workQueue又是非空的这三个条件,也允许线程加入
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    首先是一个判段

    int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    

    这段代码的含义是只有两种情况线程池允许添加一个线程。
    1.线程池状态为running
    2.线程池状态为shutdown,任务task为null,且workqueue不为空。添加这样的线程目的是为了消耗workQueue中的线程。

    相关文章

      网友评论

          本文标题:java线程池源码分析

          本文链接:https://www.haomeiwen.com/subject/rhdazftx.html