创建线程属性
- 我们可以通过ThreadPoolExecutor构造函数来创建一个线程池:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 用于记录线程池池的 状态和当前待work线程数量
* 前3位记录线程池状态
* 后29位记录运行work数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** Java 中Integer 类型长度为32位,线程池用一个int类型的前3位表示线程池的状态**/
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 用来计算出当前线程池状态中间变量,同时也表示work最大数量
* 00011111 11111111 11111111 11111111
**/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/** -----------------线程池状态----------------- **/
/**
* 线程池RUNNING状态,当前状态下线程池可以接收新的任务,对新接收的任务进行处理,
* 工厂正常运行
*
* -1 二进制 11111111111111111111111111111111 左移动 29位 前三位 111
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 线程池SHUTDOWN状态,当前状态下线程池不在接收新任务,对之前接收的任务(其中包括还在队列等待和正在执行的任务)
* 工厂不在接收新的订单,工厂运行出现了问题
*
* 0 二进制 00000000000000000000000000000000 左移动 29位 前三位 000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 线程池STOP状态,当前状态下线程池不在接收新任务,对之前接收的任务存在队列没有处理的不在处理,正在执行做中断
* 工厂不在接收新的订单,工厂要倒闭了
*
* 1 二进制 00000000000000000000000000000001 左移动 29位 前三位 001
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 线程池TIDYING状态,当前没有待执行的任务,等待执行注册到JVM的钩子函数terminated()
* 工厂走倒闭程序,需要做最后清理工作
*
* 2 二进制 00000000000000000000000000000010 左移动 29位 前三位 010
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 执行完VM的钩子函数terminated()
* 工厂关闭
* 3 二进制 00000000000000000000000000000011 左移动 29位 前三位 011
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/** 计算获取当前线程池状态 **/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/** 计算获取当前运行work数量**/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 即根据线程池的状态和worker数量合并成整形 ctl
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/** 判断当前线程池是否小于s,c表示当前线程池状态 **/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/** 判断当前线程池是否大于等于s,c表示当前线程池状态 **/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/** 判断当前线程池是否正在正常运行 RUNNING状态**/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS增加线程池中work数量(后29位可以直接整数运算)
* 成功返回true,失败返回false
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS减少线程池中work数量(后29位可以直接整数运算)
* 成功返回true,失败返回false
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS减少线程池中work数量(后29位可以直接整数运算),失败循环继续尝试直到成功
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
private final BlockingQueue<Runnable> workQueue;
/**
* 存放worker线程的集合
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 控制ThreadPoolExecutor的全局可重入锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 控制ThreadPoolExecutor的全局可重入锁
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录work数量(片段值)
*/
private int largestPoolSize;
/**
* 完成任务数量
*/
private long completedTaskCount;
/**
* work线程构造工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 线程池无法接收新任务时,拒绝执行任务处理器,可以自定义
*/
private volatile RejectedExecutionHandler handler;
/**
* work线程(非核心线程)空闲的时间,大于此时间是被销毁
*/
private volatile long keepAliveTime;
/**
* 是否允许回收核心work线程
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池中核心work线程的数量。
*/
private volatile int corePoolSize;
/**
* 线程池中允许的最大work数量
*/
private volatile int maximumPoolSize;
/** 默认的拒绝策略 **/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
线程池的创建
/**
* 创建一个线程池,使用默认线程池的拒绝策略和创建work工厂
* @param corePoolSize 线程池中核心work线程的数量。
* @param maximumPoolSize 线程池中允许的最大work数量
* @param keepAliveTime work线程(非核心线程)空闲的时间,大于此时间是被销毁
* @param unit keepAliveTime的单位。TimeUnit
* @param workQueue 用来保存等待执行的任务的阻塞队列
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 创建一个线程池,使用默认线程池的拒绝策略
* @param corePoolSize 线程池中核心work线程的数量。
* @param maximumPoolSize 线程池中允许的最大work数量
* @param keepAliveTime work线程(非核心线程)空闲的时间,大于此时间是被销毁
* @param unit keepAliveTime的单位。TimeUnit
* @param workQueue 用来保存等待执行的任务的阻塞队列
* @param threadFactory 创建work工厂
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 创建一个线程池,使用默认的创建work工厂
* @param corePoolSize 线程池中核心work线程的数量。
* @param maximumPoolSize 线程池中允许的最大work数量
* @param keepAliveTime work线程(非核心线程)空闲的时间,大于此时间是被销毁
* @param unit keepAliveTime的单位。TimeUnit
* @param workQueue 用来保存等待执行的任务的阻塞队列
* @param handler 线程池的拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 创建一个线程池,
* @param corePoolSize 线程池中核心work线程的数量。
* @param maximumPoolSize 线程池中允许的最大work数量
* @param keepAliveTime work线程(非核心线程)空闲的时间,大于此时间是被销毁
* @param unit keepAliveTime的单位。TimeUnit
* @param workQueue 用来保存等待执行的任务的阻塞队列
* @param threadFactory 创建work工厂
* @param handler 线程池的拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
网友评论