[toc]
在前面学习ThreadPoolExecutor的时候,我们知道,当线程池中的线程数量大于核心线程数的时候,或者开启了allowCoreThreadTimeOut参数的时候,那么线程去工作队列获取任务的时候就会适用poll(timeout)方法。一旦工作队列中没有任务的时候,则会适用take方法,这两个方法如果没有数据,都会阻塞。而阻塞之后,则会导致线程池中的线程进入TIME_WAITING或者WAITING状态。那么,如果线程池的核心线程收缩或者其他情况导致需要将线程唤醒,则会调用interrupt方法对线程进行中断。而中断的过程中,需要先获得worker实现的AQS锁。可见中断方法在线程池中的应用至关重要。
实际上关于java的interrupt,在我们涉及多线程的各个环节中都至关重要。今天来对java的中断机制进行深度分析。
1.java的中断机制
中断机制实际上是操作系统中的一个核心功能,是指在CPU的正常运行期间,由于外部事件或者程序预先安排的事件而引起的CPU暂时停止正在运行的程序,转而为该内部和外部事件预先安排的事件服务程序中去,服务完毕之后再返回继续运行暂时被中断的程序。
简单点说就是将CPU正在执行的程序中断,之后执行我们预定某个处理服务,之后再去运行之前中断的程序。这有点类似于我们日常工作中正在干一件时间,中间被人打断,之后再继续工作。因此中断这个词也比较形象。
java的中断机制,实际上式采用的协作式的中断机制。我们认为一个线程只要被调用了interrupt方法,就会立即抛出异常并退出,这个过程应该是同步发生的。实际上并不是,java并没有采用这种抢占式的中断机制。所谓协作式的中断机制,是指我们调用了in
terrupt之后,只是在线程内部放置一个标识,而线程并不会立即被中断,而是在执行过程中,根据情况,有可能是定时轮询等,线程根据这个标识来触发中断的响应处理。
前面有人举了一个例子,就是好比每年过年的时候,家长总是要提醒我们注意身体,但是何时怎么来注意,家长们无法来落实,仅仅只是一个提醒,而至于我们收到了提醒,怎么来做,则完全取决于我们自己。对于线程也一样。
2.Thread提供的有关interrupt的方法
在Thread中,提供了3个与interrupt有关的方法。我们分别来对这三个方法进行分析。
2.1 interrupt
public void interrupt() {
//判断权限 是否有对该线程进行中断的权限
if (this != Thread.currentThread())
checkAccess();
//blockerLock 和 blocker
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
interrupt方法实际上就是这个在系统中做标记的方法。调用interrupt之后,首先判断是否有中断的权限。
/* The object in which this thread is blocked in an interruptible I/O
* operation, if any. The blocker's interrupt method should be invoked
* after setting this thread's interrupt status.
*/
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
而关于blockerLock和blocker,根据注释是说,只有线程在进行I/O操作的时候,才需要用到这个锁。因此在一般的情况下并不会进入这个if分支。
最终调用的是interrupt0的native方法。
我们来看看hotspot中的thread源码:
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
实际上调用的是os的interrupt方法。
而在os的interrupt方法中:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
真正起作用的是:
osthread->set_interrupted(true);
这个操作实际上是:
void set_interrupted(bool z) { _interrupted = z ? 1 : 0; }
volatile bool interrupted() const { return _interrupted != 0; }
也就是说,_interrupted就是我们前面说到的中断标识,而interrupt方法,只是将这个值设置为了1。
同时前面有个判断逻辑,确保这个值如果重复调用的话实际上只会执行第一次。
if (!osthread->interrupted())
interrupt方法实际上就是对我们说的这个状态标识进行设置。
2.2 interrupted
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
此方法的目的是,返回当前的中断标识的状态,并将之前状态进行清空。底层调用的是:
private native boolean isInterrupted(boolean ClearInterrupted);
这个方法会根据传入的ClearInterrupted状态,如果为true则会将中断状态重置。
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
}
实际上调用了is_interrupted方法。
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
在这个方法中,通过set_interrupted进行了重置。
2.3 isInterrupted
public boolean isInterrupted() {
return isInterrupted(false);
}
在学完interruped方法之后,这个方法就很容易理解了,就是获得当前线程的中断标识,而不进行重置。因为此处实际上调用的是isInterrupted方法,而传入的值是false。
3 interrupt对Thread各状态的影响
我们在前面了解过java线程的状态,我们现在来实验一下,在这些状态下,interrupt方法会有什么作用。
线程状态
3.1 NEW
public static void main(String[] args) {
Thread t = new Thread(() -> {
while (true) {
System.out.println(Thread.currentThread().getName()+" : 执行");
}
},"t");
System.out.println("thread t state is :"+t.getState());
System.out.println(t.isInterrupted());
t.interrupt();
System.out.println(t.isInterrupted());
}
执行结果:
thread t state is :NEW
false
false
可见NEW状态下inerrupt是无效的。
3.2 TERMINATED
public static void main(String[] args) throws InterruptedException{
Thread t = new Thread(() -> {
},"t");
t.start();
t.join();
System.out.println("thread t state is :"+t.getState());
System.out.println(t.isInterrupted());
t.interrupt();
System.out.println(t.isInterrupted());
}
执行结果:
thread t state is :TERMINATED
false
false
可见TERMINATED状态下inerrupt也是无效的。
3.3 RUNNABLE
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
System.out.println("thread t state is :"+t.getState());
System.out.println(t.isInterrupted());
t.interrupt();
System.out.println(t.isInterrupted());
}
private static class MyThread extends Thread {
@Override
public void run() {
while (true) {
if(this.isInterrupted()){
System.out.println(Thread.currentThread().getName()+":interrupted");
break;
}
}
}
}
上述代码执行:
thread t state is :RUNNABLE
false
true
Thread-0:interrupted
可见,当线程在RUNNABLE状态的时候,interrupt状态是能够被传递到执行的线程本身去的。因此我们可以自行决定根据线程的状态来自行处理。
3.4 BLOCKED
public class InterruptTestBlock {
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (true) {
}
}
},"t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
while (true) {
}
}
},"t2");
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("t1 state is:"+t1.getState());
System.out.println("t2 state is:"+t2.getState());
System.out.println(t2.isInterrupted());
t2.interrupt();
System.out.println(t2.isInterrupted());
System.out.println(t2.getState());
}
}
再来测试当线程处于BLOCK状态的情况。上述代码输出:
t1 state is:RUNNABLE
t2 state is:BLOCKED
false
true
BLOCKED
可见BLOCKED状态与RUNNABLE状态一样。都是可以识别到interrupt状态然后来进行处理的。
3.5 WAITING/TIMED_WAITING
这两者状态本质上没有区别:
public class InterruptTestWait {
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" interrupted ");
System.out.println(Thread.currentThread().getName()+" isInterrupted is :"+Thread.currentThread().isInterrupted());
System.out.println(Thread.currentThread().getName()+" state is :"+Thread.currentThread().getState());
e.printStackTrace();
}
}
});
t1.start();
TimeUnit.SECONDS.sleep(1);
System.out.println(t1.getState());
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
}
}
WAITING状态下执行结果:
WAITING
Thread-0 interrupted
Thread-0 isInterrupted is :false
Thread-0 state is :RUNNABLE
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.dhb.threadpool.InterruptTestWait.lambda$main$0(InterruptTestWait.java:13)
at java.lang.Thread.run(Thread.java:748)
WAITING状态下给了我们一个异常信号,我们根据这个异常决定要采取何种处理。TIME_WAITTING的方案实际上也是类似的:
public class InterruptTestTimeWait {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted ");
System.out.println(Thread.currentThread().getName() + " isInterrupted is :" + Thread.currentThread().isInterrupted());
System.out.println(Thread.currentThread().getName() + " state is :" + Thread.currentThread().getState());
e.printStackTrace();
}
});
t1.start();
TimeUnit.SECONDS.sleep(1);
System.out.println(t1.getState());
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
}
}
执行结果:
TIMED_WAITING
Thread-0 interrupted
Thread-0 isInterrupted is :false
Thread-0 state is :RUNNABLE
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.dhb.threadpool.InterruptTestTimeWait.lambda$main$0(InterruptTestTimeWait.java:11)
at java.lang.Thread.run(Thread.java:748)
4.总结
本文对java中的interrupt机制进行了分析。
在操作系统中,CUP为了在运行时能够相应外部的请求,对外提供了一个中断的中断引脚。CPU在每个指令周期的最后一个晶振周期检查中断引脚,如果有中断任务,则立即停止手中的工作(当然要先保存现场)调用相应中断号的中断处理程序对中断做出响应。进程在运行时为了响应外部请求,对外提供了信号队列。在每次由核心态转为用户态(比如由进程调度方法转到用户进程)时,会先检查自己的信号队列是否存在外部发来的信号,如果有则调用对应信号的信号处理程序对信号做出响应(Linux 下由 OS 在调度某进程前检查其信号队列,如果存在需要处理的信号则开启一个新线程调用信号处理程序)。这实际上是一种轮询机制。由CPU的晶震来触发。
但是程序产生的中断信号总是随机的。我们不知道什么时候会发生这些事件,所以我们只能在现有调度方式的基础上,周期性的检查是否有信号到达。对中断信号的检查是基于处理器对指令的调度方式,每条指令的逻辑执行完成时去检查是否中断信号发生;对进程信号的检查是基于 OS 对进程的调度方式,每次调度该进程前检查是否有信号发生。
JVM 在线程层面为我们提供了类似的方法,线程的 interrupt 信号。
但是这一点与CPU中断有很大不同,主要在于,CPU的中断有操作系统提供支撑,而JVM的中断却受制于操作系统。因此JVM提供了另外一种思路,将中断信号的处理,交给线程的调用者本身来决定。这样,jvm在检测到中断信号之后,只用抛出一个异常,而jaa的调用方法根据这个异常来进行后续的相关处理即可。
可以参考JVM 对 interrupt 信号的响应这篇博客。让我理解了很多底层的相关知识。
最后我们需要注意的是,关于Thread的三个interrupt的方法,一定要掌握其内容,这一定是面试的高频问题。
网友评论