美文网首页
ThreadPoolExecutor源码分析(一):重要的成员变

ThreadPoolExecutor源码分析(一):重要的成员变

作者: 徐同学呀 | 来源:发表于2018-12-12 17:09 被阅读0次

    ThreadPoolExecutor部分重要成员变量:
    1、AtomicInteger ctl
    2、workQueue
    3、corePoolSize
    4、maximumPoolSize
    5、keepAliveTime
    6、handler

    一、AtomicInteger ctl

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    AtomicInteger类型的ctl代表了ThreadPoolExecutor中的控制状态,它是一个复核类型的成员变量,是一个原子整数,借助高低位包装了两个概念:

    1. workerCount:线程池中当前活动的线程数量,占据ctl的低29位;
    2. runState:线程池运行状态,占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。

    workerCount:

    COUNT_BITS的定义代表了workerCount所占位数:

    private static final int COUNT_BITS = Integer.SIZE - 3;
    //在Java中,一个int占据32位,而COUNT_BITS的结果不言而喻,Integer大小32减去3,就是29。
    

    CAPACITY的定义限制了workerCount的理论最大活跃线程数:

    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //运算过程为1左移29位,也就是00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,
    //再减去1,就是 000 11111 11111111 11111111 11111111,
    //前三位代表线程池运行状态runState,
    //所以这里workerCount的理论最大值就应该是29个1,即536870911。
    

    workerCount作为其中一个概念复合在AtomicInteger ctl中,ThreadPoolExecutor也提供了从AtomicInteger ctl中解析出workerCount的方法,如下:

    private static int workerCountOf(int c)  { 
        return c & CAPACITY; 
    }
    //传入的c代表的是ctl的值,即高3位为线程池运行状态runState,低29位为线程池中当前活动的线程数量workerCount.
    //将其与CAPACITY进行与操作&,也就是与000 11111 11111111 11111111 11111111进行与操作,
    //c的前三位通过与000进行与操作,无论c前三位为何值,最终都会变成000,也就是舍弃前三位的值,
    //而c的低29位与29个1进行与操作,c的低29位还是会保持原值,
    //这样就从AtomicInteger ctl中解析出了workerCount的值。
    
    

    runState:

    线程池运行状态,它占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。我们先分别解释下这五种状态:

    1. RUNNING:接受新任务,并处理队列任务
    //Accept new tasks and process queued tasks
    private static final int RUNNING    = -1 << COUNT_BITS;
    

    -1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为1的话,表示RUNNING状态,即-536870912;

    1. SHUTDOWN:不接受新任务,但会处理队列任务
    //Don't accept new tasks, but process queued tasks
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    

    0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为0的话,表示SHUTDOWN状态,即0。

    1. STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
    //Don't accept new tasks, don't process queued tasks,
    //and interrupt in-progress tasks
    private static final int STOP       =  1 << COUNT_BITS;
    

    1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为001的话,表示STOP状态,即536870912;

    1. TIDYING:所有的任务已结束,workerCount为0,线程过渡到TIDYING状态,将会执行terminated()钩子方法。
    /*
    All tasks have terminated, workerCount 
    is zero,the thread transitioning to 
    state TIDYING will run the terminated() hook method
    */
    private static final int TIDYING    =  2 << COUNT_BITS;
    

    2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为010的话,表示TIDYING状态,即1073741824。

    1. TERMINATED:terminated()方法已经完成
    //terminated() has completed
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    3在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为011的话,表示TERMINATED状态,即1610612736。

    由上面我们可以得知,运行状态的值按照RUNNING-->SHUTDOWN-->STOP-->TIDYING-->TERMINATED顺序值是递增的,这些值之间的数值顺序很重要。随着时间的推移,运行状态单调增加,但是不需要经过每个状态。那么,可能存在的线程池状态的转换是什么呢?如下:

         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
         *
    
    1. RUNNING -> SHUTDOWN:调用shutdown()方法后,或者线程池实现了finalize方法,在里面调用了shutdown方法,即隐式调用。
    2. (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()方法后。
    3. SHUTDOWN -> TIDYING:线程池和队列均为空时。
    4. STOP -> TIDYING:线程池为空时。
    5. TIDYING -> TERMINATED:terminated()钩子方法完成时。

    接下来看看如何从ctl中提取runState:

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    

    ~ 是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1,而~CAPACITY则表示高位的3个1,2低位的9个0,然后再与入参c执行按位与操作,即高3位保持原样,低29位全部设置为0,也就获取了线程池的运行状态runState。

    最后看一下这个方法:

    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    将runState和workerCount做或操作|处理,即用runState的高3位,workerCount的低29位填充的数字,而默认传入的runState、workerCount分别为RUNNING和0。

    接下来看一下其他几个重要的成员变量:

        /**
         * The queue used for holding tasks and handing off to worker
         * threads.  We do not require that workQueue.poll() returning
         * null necessarily means that workQueue.isEmpty(), so rely
         * solely on isEmpty to see if the queue is empty (which we must
         * do for example when deciding whether to transition from
         * SHUTDOWN to TIDYING).  This accommodates special-purpose
         * queues such as DelayQueues for which poll() is allowed to
         * return null even if it may later return non-null when delays
         * expire.
         */
        private final BlockingQueue<Runnable> workQueue;
    
        /**
         * Lock held on access to workers set and related bookkeeping.
         * While we could use a concurrent set of some sort, it turns out
         * to be generally preferable to use a lock. Among the reasons is
         * that this serializes interruptIdleWorkers, which avoids
         * unnecessary interrupt storms, especially during shutdown.
         * Otherwise exiting threads would concurrently interrupt those
         * that have not yet interrupted. It also simplifies some of the
         * associated statistics bookkeeping of largestPoolSize etc. We
         * also hold mainLock on shutdown and shutdownNow, for the sake of
         * ensuring workers set is stable while separately checking
         * permission to interrupt and actually interrupting.
         */
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        /**
         * Wait condition to support awaitTermination
         */
        private final Condition termination = mainLock.newCondition();
    
        /**
         * Tracks largest attained pool size. Accessed only under
         * mainLock.
         */
        private int largestPoolSize;
    
        /**
         * Counter for completed tasks. Updated only on termination of
         * worker threads. Accessed only under mainLock.
         */
        private long completedTaskCount;
    
        /*
         * All user control parameters are declared as volatiles so that
         * ongoing actions are based on freshest values, but without need
         * for locking, since no internal invariants depend on them
         * changing synchronously with respect to other actions.
         */
    
        /**
         * Factory for new threads. All threads are created using this
         * factory (via method addWorker).  All callers must be prepared
         * for addWorker to fail, which may reflect a system or user's
         * policy limiting the number of threads.  Even though it is not
         * treated as an error, failure to create threads may result in
         * new tasks being rejected or existing ones remaining stuck in
         * the queue.
         *
         * We go further and preserve pool invariants even in the face of
         * errors such as OutOfMemoryError, that might be thrown while
         * trying to create threads.  Such errors are rather common due to
         * the need to allocate a native stack in Thread.start, and users
         * will want to perform clean pool shutdown to clean up.  There
         * will likely be enough memory available for the cleanup code to
         * complete without encountering yet another OutOfMemoryError.
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * Handler called when saturated or shutdown in execute.
         */
        private volatile RejectedExecutionHandler handler;
    
        /**
         * Timeout in nanoseconds for idle threads waiting for work.
         * Threads use this timeout when there are more than corePoolSize
         * present or if allowCoreThreadTimeOut. Otherwise they wait
         * forever for new work.
         */
        private volatile long keepAliveTime;
    
        /**
         * If false (default), core threads stay alive even when idle.
         * If true, core threads use keepAliveTime to time out waiting
         * for work.
         */
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * Core pool size is the minimum number of workers to keep alive
         * (and not allow to time out etc) unless allowCoreThreadTimeOut
         * is set, in which case the minimum is zero.
         */
        private volatile int corePoolSize;
    
        /**
         * Maximum pool size. Note that the actual maximum is internally
         * bounded by CAPACITY.
         */
        private volatile int maximumPoolSize;
    
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    
    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                                  //、runState等)的改变都要使用这个锁
    private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
     
    private volatile long  keepAliveTime;    //线程存货时间   
    private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
     
    private volatile int   poolSize;       //线程池中当前的线程数
     
    private volatile RejectedExecutionHandler handler; //任务拒绝策略
     
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
     
    private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
     
    private long completedTaskCount;   //用来记录已经执行完毕的任务个数
    

    二、workQueue:

    阻塞队列,用来存储等待执行的任务,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
    LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
    synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
    

    三、corePoolSize:

    核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

    四、maximumPoolSize:

    线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    五、keepAliveTime:

    表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    六、handler:

    表示当拒绝处理任务时的策略,有以下四种取值:

    //当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor源码分析(一):重要的成员变

          本文链接:https://www.haomeiwen.com/subject/gfephqtx.html