美文网首页
一般线程池ThreadPoolExecutor源码分析

一般线程池ThreadPoolExecutor源码分析

作者: 生不悔改 | 来源:发表于2022-03-03 16:32 被阅读0次

一、类内部结构介绍

先了解一下二进制吧。

    public static void main(String[] args) {
        System.out.println("数字2的二进制:"+Integer.toBinaryString(2));
        System.out.println("数字3的二进制:"+Integer.toBinaryString(3));
        System.out.println("数字4的二进制:"+Integer.toBinaryString(4));
        System.out.println("数字5的二进制:"+Integer.toBinaryString(5));
        System.out.println("数字6的二进制:"+Integer.toBinaryString(6));
        System.out.println("数字7的二进制:"+Integer.toBinaryString(7));
        System.out.println("数字8的二进制:"+Integer.toBinaryString(8));
        System.out.println("int类型的最大值:"+Integer.MAX_VALUE);
        System.out.println("int类型的最大值的二进制:"+Integer.toBinaryString(Integer.MAX_VALUE));
        System.out.println("int类型的最小值:"+Integer.MIN_VALUE);
        System.out.println("int类型的最小值的二进制:"+Integer.toBinaryString(Integer.MIN_VALUE));
    }

Integer.toBinaryString()方法就是将整数转换成二进制,最大32位二进制,并且该方法不会自动补位,就是给高位补零。
看一下执行结果

数字2的二进制:10
数字3的二进制:11
数字4的二进制:100
数字5的二进制:101
数字6的二进制:110
数字7的二进制:111
数字8的二进制:1000
int类型的最大值:2147483647
int类型的最大值的二进制:1111111111111111111111111111111
int类型的最小值:-2147483648
int类型的最小值的二进制:10000000000000000000000000000000

二进制相关计算

1.按位与运算符(& 叫做and)

规则:0&0 = 0, 0&1 = 0, 1&1=1.也就是说两位同时为1,结果为1,否则为0  
例子: 3 & 5 = 1.(000011 & 000101 = 000001)

2.按位或运算符( | 或者叫做or)

规则:0|0 = 0,1|0 = 1,0|1 = 1,1|1 = 1 参加位运算的两位只要有一个为1,那么就为1
  例子:3 | 5 = 7(0000011 | 00000101 = 0000111)

3.异或运算符(^ 也叫xor(以后做题会遇到xor,就是异或))

规则:0^0 = 0,01=1,10=1,1^1=0 参加位运算的两位只要相同为0,不同为1
  例子:3^5 = 6(00000011^00000101=00000110)
  特别的任意数 ^ 0 = 任意数.

4.取反运算符(~)

规则:二进制位0变为1,1变为0

5.左移(<<)

规则:相当于乘以2

6.右移(>>)

规则:相当于除以2

查看线程池的相关参数

线程池一些关键常量的定义

   // 线程池   运行状态  +  存活线程数 0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获得当前线程池的运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取当前线程池存活的线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 当前线程池标识:运行状态  +  存活线程数
    private static int ctlOf(int rs, int wc) { return rs | wc; }

写一个自动补位的将数字转换成32位二进制的方法

    /**
     * 数字转换成32位二进制
     *
     * @param number
     * @return
     */
    public static String ComplementZero(Integer number) {
        String result = Integer.toBinaryString(number);
        final int LENGTH = 32;
        if (result.length() < LENGTH) {
            StringBuilder stringBuilder = new StringBuilder(result);
            int n = LENGTH - result.length();
            for (int i = 0; i < n; i++) {
                stringBuilder = new StringBuilder("0").append(stringBuilder);
            }
            return stringBuilder.toString();
        }
        return result;
    }

执行线程池的相关代码,我们来找找规律

    public static void main(String[] args) {
        // 正在运行:接受新任务并处理排队的任务
        System.out.println("RUNNING            " + ComplementZero(RUNNING));
        // 关机:不接受新任务,但处理排队的任务
        System.out.println("SHUTDOWN           " + ComplementZero(SHUTDOWN));
        // 停止:不要接受新的任务,不要处理排队的任务,并中断正在进行的任务
        System.out.println("STOP               " + ComplementZero(STOP));
        // 整理:所有任务都已终止,workerCount为零,线程正在转换为状态清理将运行 terminated() 钩子方法
        System.out.println("TIDYING            " + ComplementZero(TIDYING));
        // 已终止:TERMINATED()已完成
        System.out.println("TERMINATE          " + ComplementZero(TERMINATED));
        // 线程池容量:五亿多个线程,
        System.out.println("CAPACITY           " + ComplementZero(CAPACITY));
}

执行结果:

RUNNING            11100000000000000000000000000000
SHUTDOWN           00000000000000000000000000000000
STOP               00100000000000000000000000000000
TIDYING            01000000000000000000000000000000
TERMINATE          01100000000000000000000000000000
CAPACITY           00011111111111111111111111111111

Process finished with exit code 0

先解释一下:rs:runningStatus(运行状态) wc:workerCount(worker数量)
线程池将32位的二进制数字的前三位做成了标识线程池的运行状态,后29位做成了表示线程池运行的线程数量,先知道了这点,我们可以看一下打印结果对应的二进制的前三位。

RUNNING            111
SHUTDOWN           000
STOP               001
TIDYING            010
TERMINATE          011

这就是对应的线程池的状态。但是在计算机中,会对这样的二进制自动补位。
再看一下线程池容量capacity的二进制表示,我们说过前三位标识线程池状态,所以我们看后面的29位,是 1 1111 1111 1111 1111 1111 1111 1111转换成数字是536870911,等于最大线程数可以有五亿多条。

CAPACITY           00011111111111111111111111111111

内部线程Worker类的介绍

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

实际上Worker是一个继承AQS框架,实现Runnable接口的。

二、核心方法的介绍

线程池的execute()方法

    public void execute(Runnable command) {
        // 校验worker对象,不能为空
        if (command == null)
            throw new NullPointerException();
/*
分三步进行:
1.如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个线程,启动一个新线程任务对addWorker的调用会自动检查运行状态和workerCount,从而防止可能增加在不应该的情况下,返回false。
2.如果任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查以来,已有的已经死亡)或者自进入此方法后,池已关闭。所以我们重新检查状态,如有必要,在以下情况下回滚排队已停止,如果没有线程,则启动新线程。
3.如果无法将任务排队,则尝试添加新任务线如果失败了,我们知道我们已经被关闭或饱和了所以拒绝这个任务。
*/
        // 获取线程池标识(c转换成二进制前三位标识线程池状态,后29位标识表示存活的线程数)
        int c = ctl.get();
        // 存活的线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 加入队列,并且标识为核心线程标志
            if (addWorker(command, true))
                return;
            // 更新当前的线程池状态标识
            c = ctl.get();
        }
        // 如果当前线程池是运行状态 && 队列末尾能加入新的任务线程
        if (isRunning(c) && workQueue.offer(command)) {
            // 拿到最新的线程池状态(加入队列的任务线程后结算的状态)
            int recheck = ctl.get();
            // 最新的线程池的状态不是运行状态 && 移除刚刚的任务线程成功
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝策略
                reject(command);
            // 否在检查最新的线程存活数是不是0
            else if (workerCountOf(recheck) == 0)
                // 如果是0(核心线程满了),创建非核心线程
                addWorker(null, false);
        }
        // 如果此刻新的任务无法加入队列(以非核心线程方式)
        else if (!addWorker(command, false))
            // 执行拒绝策略
            reject(command);
    }

线程池的addWorker()方法

/**
** firstTask   实现runnable接口的任务
** core         是否是核心线程
**/
private boolean addWorker(Runnable firstTask, boolean core) {
        // 循环的入口标志
        retry:
        // 死循环
        for (;;) {
            // 拿到当前线程池的标识 (状态+worker数)
            int c = ctl.get();
            // 获取当前的线程池状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取当前worker数量
                int wc = workerCountOf(c);
                // 如果worker数量大于等于线程池可以容纳的最大值(五亿)
                // 或者当前 worker数量大于(核心线程数,最大线程数),根据是否是核心线程标识去比较对应的值
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas方法增加线程数成功
                if (compareAndIncrementWorkerCount(c))
                    // 跳到最开始的循环标识位,重新进入循环
                    break retry;
                // 获取当前线程标识
                c = ctl.get();  // Re-read ctl
                // 如果当前线程池状态与加入一个线程后的状态不一样
                if (runStateOf(c) != rs)
                    // 重新进入循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // worker开始标致
        boolean workerStarted = false;
        // worker加入队列成功标识
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 设置重入锁,并上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 再次检查线程池当前状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // HashSet<Worker> workers = new HashSet<Worker>(),workers是worker的Set集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

小结

线程池的工作过程的设计很复杂,主要还是需要自己去debug去跟踪,理解里面的各个属性和方法的意义。慢慢一步一步的揭开里面的奥秘。

相关文章

网友评论

      本文标题:一般线程池ThreadPoolExecutor源码分析

      本文链接:https://www.haomeiwen.com/subject/jtpdrrtx.html