一、类内部结构介绍
先了解一下二进制吧。
public static void main(String[] args) {
System.out.println("数字2的二进制:"+Integer.toBinaryString(2));
System.out.println("数字3的二进制:"+Integer.toBinaryString(3));
System.out.println("数字4的二进制:"+Integer.toBinaryString(4));
System.out.println("数字5的二进制:"+Integer.toBinaryString(5));
System.out.println("数字6的二进制:"+Integer.toBinaryString(6));
System.out.println("数字7的二进制:"+Integer.toBinaryString(7));
System.out.println("数字8的二进制:"+Integer.toBinaryString(8));
System.out.println("int类型的最大值:"+Integer.MAX_VALUE);
System.out.println("int类型的最大值的二进制:"+Integer.toBinaryString(Integer.MAX_VALUE));
System.out.println("int类型的最小值:"+Integer.MIN_VALUE);
System.out.println("int类型的最小值的二进制:"+Integer.toBinaryString(Integer.MIN_VALUE));
}
Integer.toBinaryString()方法就是将整数转换成二进制,最大32位二进制,并且该方法不会自动补位,就是给高位补零。
看一下执行结果
数字2的二进制:10
数字3的二进制:11
数字4的二进制:100
数字5的二进制:101
数字6的二进制:110
数字7的二进制:111
数字8的二进制:1000
int类型的最大值:2147483647
int类型的最大值的二进制:1111111111111111111111111111111
int类型的最小值:-2147483648
int类型的最小值的二进制:10000000000000000000000000000000
二进制相关计算
1.按位与运算符(& 叫做and)
规则:0&0 = 0, 0&1 = 0, 1&1=1.也就是说两位同时为1,结果为1,否则为0
例子: 3 & 5 = 1.(000011 & 000101 = 000001)
2.按位或运算符( | 或者叫做or)
规则:0|0 = 0,1|0 = 1,0|1 = 1,1|1 = 1 参加位运算的两位只要有一个为1,那么就为1
例子:3 | 5 = 7(0000011 | 00000101 = 0000111)
3.异或运算符(^ 也叫xor(以后做题会遇到xor,就是异或))
规则:0^0 = 0,01=1,10=1,1^1=0 参加位运算的两位只要相同为0,不同为1
例子:3^5 = 6(00000011^00000101=00000110)
特别的任意数 ^ 0 = 任意数.
4.取反运算符(~)
规则:二进制位0变为1,1变为0
5.左移(<<)
规则:相当于乘以2
6.右移(>>)
规则:相当于除以2
查看线程池的相关参数
线程池一些关键常量的定义
// 线程池 运行状态 + 存活线程数 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获得当前线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取当前线程池存活的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 当前线程池标识:运行状态 + 存活线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }
写一个自动补位的将数字转换成32位二进制的方法
/**
* 数字转换成32位二进制
*
* @param number
* @return
*/
public static String ComplementZero(Integer number) {
String result = Integer.toBinaryString(number);
final int LENGTH = 32;
if (result.length() < LENGTH) {
StringBuilder stringBuilder = new StringBuilder(result);
int n = LENGTH - result.length();
for (int i = 0; i < n; i++) {
stringBuilder = new StringBuilder("0").append(stringBuilder);
}
return stringBuilder.toString();
}
return result;
}
执行线程池的相关代码,我们来找找规律
public static void main(String[] args) {
// 正在运行:接受新任务并处理排队的任务
System.out.println("RUNNING " + ComplementZero(RUNNING));
// 关机:不接受新任务,但处理排队的任务
System.out.println("SHUTDOWN " + ComplementZero(SHUTDOWN));
// 停止:不要接受新的任务,不要处理排队的任务,并中断正在进行的任务
System.out.println("STOP " + ComplementZero(STOP));
// 整理:所有任务都已终止,workerCount为零,线程正在转换为状态清理将运行 terminated() 钩子方法
System.out.println("TIDYING " + ComplementZero(TIDYING));
// 已终止:TERMINATED()已完成
System.out.println("TERMINATE " + ComplementZero(TERMINATED));
// 线程池容量:五亿多个线程,
System.out.println("CAPACITY " + ComplementZero(CAPACITY));
}
执行结果:
RUNNING 11100000000000000000000000000000
SHUTDOWN 00000000000000000000000000000000
STOP 00100000000000000000000000000000
TIDYING 01000000000000000000000000000000
TERMINATE 01100000000000000000000000000000
CAPACITY 00011111111111111111111111111111
Process finished with exit code 0
先解释一下:rs:runningStatus(运行状态) wc:workerCount(worker数量)
线程池将32位的二进制数字的前三位做成了标识线程池的运行状态,后29位做成了表示线程池运行的线程数量,先知道了这点,我们可以看一下打印结果对应的二进制的前三位。
RUNNING 111
SHUTDOWN 000
STOP 001
TIDYING 010
TERMINATE 011
这就是对应的线程池的状态。但是在计算机中,会对这样的二进制自动补位。
再看一下线程池容量capacity的二进制表示,我们说过前三位标识线程池状态,所以我们看后面的29位,是 1 1111 1111 1111 1111 1111 1111 1111转换成数字是536870911,等于最大线程数可以有五亿多条。
CAPACITY 00011111111111111111111111111111
内部线程Worker类的介绍
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
实际上Worker是一个继承AQS框架,实现Runnable接口的。
二、核心方法的介绍
线程池的execute()方法
public void execute(Runnable command) {
// 校验worker对象,不能为空
if (command == null)
throw new NullPointerException();
/*
分三步进行:
1.如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个线程,启动一个新线程任务对addWorker的调用会自动检查运行状态和workerCount,从而防止可能增加在不应该的情况下,返回false。
2.如果任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查以来,已有的已经死亡)或者自进入此方法后,池已关闭。所以我们重新检查状态,如有必要,在以下情况下回滚排队已停止,如果没有线程,则启动新线程。
3.如果无法将任务排队,则尝试添加新任务线如果失败了,我们知道我们已经被关闭或饱和了所以拒绝这个任务。
*/
// 获取线程池标识(c转换成二进制前三位标识线程池状态,后29位标识表示存活的线程数)
int c = ctl.get();
// 存活的线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 加入队列,并且标识为核心线程标志
if (addWorker(command, true))
return;
// 更新当前的线程池状态标识
c = ctl.get();
}
// 如果当前线程池是运行状态 && 队列末尾能加入新的任务线程
if (isRunning(c) && workQueue.offer(command)) {
// 拿到最新的线程池状态(加入队列的任务线程后结算的状态)
int recheck = ctl.get();
// 最新的线程池的状态不是运行状态 && 移除刚刚的任务线程成功
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 否在检查最新的线程存活数是不是0
else if (workerCountOf(recheck) == 0)
// 如果是0(核心线程满了),创建非核心线程
addWorker(null, false);
}
// 如果此刻新的任务无法加入队列(以非核心线程方式)
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
线程池的addWorker()方法
/**
** firstTask 实现runnable接口的任务
** core 是否是核心线程
**/
private boolean addWorker(Runnable firstTask, boolean core) {
// 循环的入口标志
retry:
// 死循环
for (;;) {
// 拿到当前线程池的标识 (状态+worker数)
int c = ctl.get();
// 获取当前的线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前worker数量
int wc = workerCountOf(c);
// 如果worker数量大于等于线程池可以容纳的最大值(五亿)
// 或者当前 worker数量大于(核心线程数,最大线程数),根据是否是核心线程标识去比较对应的值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas方法增加线程数成功
if (compareAndIncrementWorkerCount(c))
// 跳到最开始的循环标识位,重新进入循环
break retry;
// 获取当前线程标识
c = ctl.get(); // Re-read ctl
// 如果当前线程池状态与加入一个线程后的状态不一样
if (runStateOf(c) != rs)
// 重新进入循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker开始标致
boolean workerStarted = false;
// worker加入队列成功标识
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 设置重入锁,并上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 再次检查线程池当前状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// HashSet<Worker> workers = new HashSet<Worker>(),workers是worker的Set集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
小结
线程池的工作过程的设计很复杂,主要还是需要自己去debug去跟踪,理解里面的各个属性和方法的意义。慢慢一步一步的揭开里面的奥秘。
网友评论