ThreadPoolExecutor部分重要成员变量:
1、AtomicInteger ctl
2、workQueue
3、corePoolSize
4、maximumPoolSize
5、keepAliveTime
6、handler
一、AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
AtomicInteger类型的ctl代表了ThreadPoolExecutor中的控制状态,它是一个复核类型的成员变量,是一个原子整数,借助高低位包装了两个概念:
- workerCount:线程池中当前活动的线程数量,占据ctl的低29位;
- runState:线程池运行状态,占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。
workerCount:
COUNT_BITS的定义代表了workerCount所占位数:
private static final int COUNT_BITS = Integer.SIZE - 3;
//在Java中,一个int占据32位,而COUNT_BITS的结果不言而喻,Integer大小32减去3,就是29。
CAPACITY的定义限制了workerCount的理论最大活跃线程数:
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//运算过程为1左移29位,也就是00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,
//再减去1,就是 000 11111 11111111 11111111 11111111,
//前三位代表线程池运行状态runState,
//所以这里workerCount的理论最大值就应该是29个1,即536870911。
workerCount作为其中一个概念复合在AtomicInteger ctl中,ThreadPoolExecutor也提供了从AtomicInteger ctl中解析出workerCount的方法,如下:
private static int workerCountOf(int c) {
return c & CAPACITY;
}
//传入的c代表的是ctl的值,即高3位为线程池运行状态runState,低29位为线程池中当前活动的线程数量workerCount.
//将其与CAPACITY进行与操作&,也就是与000 11111 11111111 11111111 11111111进行与操作,
//c的前三位通过与000进行与操作,无论c前三位为何值,最终都会变成000,也就是舍弃前三位的值,
//而c的低29位与29个1进行与操作,c的低29位还是会保持原值,
//这样就从AtomicInteger ctl中解析出了workerCount的值。
runState:
线程池运行状态,它占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。我们先分别解释下这五种状态:
- RUNNING:接受新任务,并处理队列任务
//Accept new tasks and process queued tasks
private static final int RUNNING = -1 << COUNT_BITS;
-1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为1的话,表示RUNNING状态,即-536870912;
- SHUTDOWN:不接受新任务,但会处理队列任务
//Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN = 0 << COUNT_BITS;
0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为0的话,表示SHUTDOWN状态,即0。
- STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
//Don't accept new tasks, don't process queued tasks,
//and interrupt in-progress tasks
private static final int STOP = 1 << COUNT_BITS;
1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为001的话,表示STOP状态,即536870912;
- TIDYING:所有的任务已结束,workerCount为0,线程过渡到TIDYING状态,将会执行terminated()钩子方法。
/*
All tasks have terminated, workerCount
is zero,the thread transitioning to
state TIDYING will run the terminated() hook method
*/
private static final int TIDYING = 2 << COUNT_BITS;
2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为010的话,表示TIDYING状态,即1073741824。
- TERMINATED:terminated()方法已经完成
//terminated() has completed
private static final int TERMINATED = 3 << COUNT_BITS;
3在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为011的话,表示TERMINATED状态,即1610612736。
由上面我们可以得知,运行状态的值按照RUNNING-->SHUTDOWN-->STOP-->TIDYING-->TERMINATED顺序值是递增的,这些值之间的数值顺序很重要。随着时间的推移,运行状态单调增加,但是不需要经过每个状态。那么,可能存在的线程池状态的转换是什么呢?如下:
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
- RUNNING -> SHUTDOWN:调用shutdown()方法后,或者线程池实现了finalize方法,在里面调用了shutdown方法,即隐式调用。
- (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()方法后。
- SHUTDOWN -> TIDYING:线程池和队列均为空时。
- STOP -> TIDYING:线程池为空时。
- TIDYING -> TERMINATED:terminated()钩子方法完成时。
接下来看看如何从ctl中提取runState:
private static int runStateOf(int c) { return c & ~CAPACITY; }
~ 是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1,而~CAPACITY则表示高位的3个1,2低位的9个0,然后再与入参c执行按位与操作,即高3位保持原样,低29位全部设置为0,也就获取了线程池的运行状态runState。
最后看一下这个方法:
private static int ctlOf(int rs, int wc) { return rs | wc; }
将runState和workerCount做或操作|处理,即用runState的高3位,workerCount的低29位填充的数字,而默认传入的runState、workerCount分别为RUNNING和0。
接下来看一下其他几个重要的成员变量:
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小
//、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private volatile long keepAliveTime; //线程存货时间
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
二、workQueue:
阻塞队列,用来存储等待执行的任务,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
三、corePoolSize:
核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
四、maximumPoolSize:
线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
五、keepAliveTime:
表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
六、handler:
表示当拒绝处理任务时的策略,有以下四种取值:
//当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
网友评论