文章来自简书 李狄青
要转载请先联系作者,并标明出处。有疑问欢迎留言一起探讨 ~
本文目录:
- 概述
- Thread
- 状态机
- 优先级
- 中断
- Worker
- 一些简单的记录
- 维持中断控制状态
- ThreadFactory
- BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- RejectExecutionHandler
相关链接:
概述
ThreadPoolExecutor 不是一个人在战斗。它的实现是有多个组件一起配合的。这里做下对他们的身份,一些原理、机制等等做一个简要说明。
这个是后面我们去理解 ThreadPoolExecutor 源码的基础。
Thread
Thread 无需多讲,就是用来执行 Runnable 的线程。JVM 的线程和操作系统的线程是对应的。Thread 很多方法最终都会执行到 native 方法上。Thread 的也是一个很庞大且复杂的类。这次聚焦在它的生命周期,还有关于如何科学地关闭一个线程。
而且多线程执行任务效率未必高,因为涉及到上下文切换的问题。比如两个线程争用一个 CPU,切换线程 到时候,切换出去的线程的运行状态要保存,下次切换回来的时候要恢复。比如程序计数器,CPU 寄存器等等的数据。
状态机
一个线程又这几个状态:
-
NEW
,新创建。 -
RUNNALE
,可运行。 -
BLOCKED
,被阻塞。 -
WAITING
,等待。 -
TIMED_WAITING
,计时等待。 -
TERMINATED
,被终止。
完整的线程状态机如下,该状态机从网上的其他博客收集并且又做了些整理,如图:
线程状态机.png优先级
线程的优先级有 1-10,默认优先级是 5。Java 定义了这几个常量用来表示线程优先级:
- MIN_PRIORITY = 1,最小优先级。
- NORM_PRIORITY = 5,普通优先级。
- MAX_PRIORITY = 10,最高优先级。
线程优先级体现的是争用 CPU 获取到资源并执行的概率,高优先级概率高。
一个新创建的线程,优先级和创建它的线程一样。比如创建线程会走 init 方法,里面有设置优先级的代码;
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc) {
...
Thread parent = currentThread();
...
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
...
setPriority(priority);
...
}
也可以通过 setPriority 方法设置:
public final void setPriority(int newPriority) {
ThreadGroup g;
checkAccess();
if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
throw new IllegalArgumentException();
}
if((g = getThreadGroup()) != null) {
if (newPriority > g.getMaxPriority()) {
newPriority = g.getMaxPriority();
}
setPriority0(priority = newPriority);
}
}
中断
中断是退出线程的一种方式。也是最科学的方式。
调用 Thread 的 interrupt() 方法后,线程的中断状态会被设置为 true。并且会有这些情况发生:
- 线程被阻塞,比如调用了 object.wait(),thread.join(),thread.sleep(),中断标记会被清除,然后抛出 InterruptedException。
- 如果没有被阻塞的话,中断标志会被设置为 true。
线程中的一些任务的执行中,有的是会去判断线程的中断标志,如果为 true 的话,有的会主动抛出 InterruptException。
比如 ReentrantLock 的 lockInterruptibly 方法,追踪到它内部的 AQS 实现 sync 的方法 acquireInterruptible
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
这样的应用场景有很多。正确的响应中断是安全退出程序的重要方式。
用来判断线程是否中断有两种方法:
- interrupted
- isInterrupted
这两种方法的区别在于 interruped 会重置中断标记为 false,所以也有清除中断标记的功能。而 isInterrpted 仅仅只是用来测试线程是否被终止。
比如 ReentrantLock 内部的 AQS 实现 sync 的 acquireInterruptibly 方法,调用 interrupted 后同时清空了中断标记。
Worker
Worker 是 ThreadPoolExecutor 的内部类,继承 AQS(AbstractQueuedSynchronizer),实现 Runnable 接口。在创建 Worker 实例的时候,会从 ThreadFactory 中生产一个 Thread,并且将当前 Worker 设置为 Thread 的任务。
ThreadPoolExecutor 的所有工作线程都被封装在 Worker 中。整个 Worker 的定义如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker 继承了 AQS,也实现了 Runnable 接口。
Worker 对线程做了增强处理,主要有这几个:
一些简单的记录
哪些记录呢?看代码:
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
无非三个:
- 当前 Worker 绑定的线程,也是运行该 Worker 的线程。
- 该线程要执行的第一个任务。可能为 null。为 null 的话线程会直接去队列中拿。
- 该线程一共执行了多少任务
completedTasks
。
比如记录的第一个执行的任务,可以在 runWorker 中使用,让线程不再去从队列中取任务。
而 completedTasks
可以用来统记线程池一共执行的任务数。ThreadPoolExecutor 的 getCompletedTaskCount 方法:
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
维持中断控制状态
线程是否可被中断的状态有 Worker 维持。
具体就是通过实现了 AQS 的方法实现了简单的锁。在执行任务的过程,执行前会去获取锁,执行后会去释放锁。所以持有锁表示线程正在执行任务,不可以中断。Worker 提供了以下方法进行锁一些操作,其实也就是可中断状态的修改和获取:
- lock
- tryLock
- unlock
- isLocked
和 ReentrantLock 是不是很相似?是的,其实就是一种锁的实现。这里的锁可以用来表示线程是否在执行任务。
我们知道 ReentrantLock 也是 AQS 实现的,直接维持一个 ReentranLock 不就可以了,为什么要再实现一个?
那是因为不希望用到 ReentrantLock 的可重入特性。
为了理解这个点,我们看什么时候会去中断线程。中断意味着退出线程,要关闭线程池的时候会执行 ThreadPoolExecutor 的:
- shutdown
- tryTerminate
最终会调用到 interruptIdleWorkers 方法,去中断那些空闲的线程。但是不仅仅是线程关闭操作调用了 interruptIdleWorkers,还有一些修改线程池配置的时候也会触发 interruptIdleWorkers:
- allowCoreThreadTimeOut
- setCorePoolSize
- setKeepAliveTime
- setMaximumPoolSize
如果使用 ReentrantLock 的话,在执行中的 Runnable 中执行 ThreadPoolExecutor 的这些修改配置的方法时是可以拿到锁的,也就是可以去调用 interruptIdleWorkers,而因为 Runnable 已经持有锁, tryLock 是可以拿到锁的,这个会导致线程异常中断的发生。
所以 Worker 自己实现了不可重入锁,避免了运行的线程调用一些配置方法导致线程被中断的问题。
这里把 ReentrantLock 和 Worker 的 tryLock 方法的实现做一下对比。首先是可重入锁 ReentrantLock 的实现,可以追踪到这个代码
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
sync 就是 ReentrantLock 的 AbstractQueuedSynchronizer 的实现。nofairTryAcquire 方法如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
getState 的值,0 表示没有锁,0 以上表示已经被锁了。然后 ReentrantLock 的 Sync 实现,会继续判断持有锁的线程是否是当前线程 current == getExclusiveOwnerThread()
,如果是的话还可以继续拿到锁,并增加 state 的值。所以可重入的实现就在这儿。
然后我们再看看 Worker 的实现:
public boolean tryLock() { return tryAcquire(1); }
也是调用 AQS 的 tryAcquire,这里的实现很简单:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
可以看到,没有可重入的概念,有锁才能拿,即使当前线程已经持有锁了,也不会再重新获得锁。所以不用担忧在线程在执行的任务中,触发 interruptIdleWorkers 把自己给中断了。
ThreadFactory
线程工厂,用来生产线程,定义如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
在 Worker 的构造函数里,使用了该类来创建新线程:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
对于新创建的线程,可以做这些配置:
- 设置线程名。
- 设置为守护线程(不推荐)。
- 设置线程优先级(不推荐)。
工具类 Executors 内置了一个线程工厂的默认实现,我们可以拿来做参考:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
这样运行起来的线程会有这样的名字
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
...
可以看到这里的线程计数方式,使用了一个 AtomicInteger 来实现,内部用 CAS 来保证线程安全。
同时这个线程工厂的线程,为非守护线程,优先级为NORM_PRIORITY
BlockingQueue
顾名思义,就是阻塞队列。整个接口的定义如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
这里的阻塞是什么意思?就像生产者消费者模型的场景一样:
- 如果队列为空,取数据的方法可以阻塞等待,直到队列有数据。
- 如果队列不能加入数据(比如满了),存数据的方法可以阻塞等待,直到队列可用。
线程池的任务队列就是 BlockingQueue 实例,线程池的使用是一个完整的生产者和消费者模型。线程池内部的线程是消费者,而使用线程池执行任务的调用方为生产者。
线程池中线程从 BlockingQueue 取任务,没有任务就会阻塞等待。在阻塞等待新任务到来的这个期间又被称为空闲时间。像线程池中我们设置了等待超时时间 keepAliveTime,就是通过 BlockingQueue 的 poll 方法实现的,超时就返回 null。
E poll(long timeout, TimeUnit unit)
如果不受 keepAliveTime 的超时时间控制的线程,比如核心线程,且 allowCoreThreadTimeout 设置为 false 的情况下,会调用 take 方法取任务,会一直阻塞直到有线程到达,或者被中断。
E take() throws InterruptedException;
阻塞队列 JDK 提供了多种实现,都可以应用到线程池中。我们比较常用的有这三种:
ArrayBlockingQueue
有界队列,FIFO,内部是数组实现。创建时必须指定容量 capacity。它使用的数组定义如下:
final Object[] items;
内部使用了 ReentrantLock 和 Condition 来实现阻塞等待数据的获取,比如 take 方法如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
每次被唤醒的时候都会去检查一下队列中有没有数据,没有的话调用 notEmpty.await()
继续等待。
LinkedBlockingQueue
可以是有界队列,也可以是无界队列,FIFO。内部是链表实现。实例化的时候可以选择是否要实例化 capacity 大小。工具类 Executors 创建的固定线程线程池 newFixedThreadPool
和单线程线程池 newSingleThreadPool
使用的就是无界的 LinkedBlockingQueue。这个链表的每个节点定义如下:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
我们也看看它 的 take 方法:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
也是使用 ReentrantLock 和 Condition 来完成阻塞读数据的功能。不过这里使用了两个锁,putLock 和 takeLock,所以 count 的计数可能会被多个线程同时触发,这里使用 AtomicInteger 来实现线程安全。
SynchronousQueue
该队列没有实际大小,capacity 始终为空,作用就是做个中转,把生产者生产的数据传给消费者,而不会做停留。工具类 Executors 创建的缓存线程池 newCacheThreadPool
使用的就是该队列。看一下取数据的 take 方法:
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
内部使用了 Transferer 用来为生产者和消费者传递队列数据。
RejectExecutionHandler
当执行 execute 方法时,线程池已满,队列已满,或者线程池已经进入关闭阶段,会拒绝执行该 Runnable,然后把 Runnable 会交给一个 RejectExecutionHandler 的实例去处理。
看 RejectExecutionHandler 的定义:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
线程池里有这几种实现类:
CallerRunsPolicy ,让调用者去执行。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
AbortPolicy,线程池默认使用该策略,很简单粗暴,直接抛出 RejectedExecutionException 异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy ,实现了空方法,正如它的名字所言,忽略了这个被拒绝的任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy ,先抛弃线程池任务队列中尚未执行的任务,然后再尝试调用 execute 方法。如果线程池已经是 SHUTDOWN 状态,也不会任何处理:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
如果我们的线程池设计,是有可能出现没有线程可以运行任务,而且任务队列也满了的情况下,最好实现一个 RejectExecutionHandler 来处理线程池无法处理的任务。
网友评论