美文网首页Java 并发
Java线程源码解析之interrupt

Java线程源码解析之interrupt

作者: allanYan | 来源:发表于2016-12-16 21:24 被阅读587次

    概述

    Thread提供了interrupt方法,中断线程的执行:

    • 如果线程堵塞在object.wait、Thread.join和Thread.sleep,将会抛出InterruptedException,同时清除线程的中断状态;
    • 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;
    • 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;
    public void interrupt() {
            if (this != Thread.currentThread())
                checkAccess();
    
            synchronized (blockerLock) {
                Interruptible b = blocker;
                if (b != null) {
                    interrupt0();           // Just to set the interrupt flag
                    b.interrupt(this);
                    return;
                }
            }
            interrupt0();
        }
    

    源码实现

    之前在分析Thread.start的时候曾经提到,JavaThread有三个成员变量:

    //用于synchronized同步块和Object.wait() 
    ParkEvent * _ParkEvent ; 
    //用于Thread.sleep() 
    ParkEvent * _SleepEvent ; 
    //用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport调用, 
    //因此它支持了java.util.concurrent的各种锁、条件变量等线程同步操作,是concurrent的实现基础 
    Parker* _parker;
    

    初步猜测interrupt实现应该与此有关系;
    interrupt方法的源码也在jvm.cpp文件:

    JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
      JVMWrapper("JVM_Interrupt");
    
      // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
      oop java_thread = JNIHandles::resolve_non_null(jthread);
      MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
      // We need to re-resolve the java_thread, since a GC might have happened during the
      // acquire of the lock
      JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
      if (thr != NULL) {
        Thread::interrupt(thr);
      }
    JVM_END
    

    JVM_Interrupt对参赛进行了校验,然后直接调用Thread::interrupt:

    void Thread::interrupt(Thread* thread) {
      trace("interrupt", thread);
      debug_only(check_for_dangling_thread_pointer(thread);)
      os::interrupt(thread);
    }
    

    Thread::interrupt调用os::interrupt方法实现,os::interrupt方法定义在os_linux.cpp:

    void os::interrupt(Thread* thread) {
      assert(Thread::current() == thread || Threads_lock->owned_by_self(),
        "possibility of dangling Thread pointer");
    
      //获取系统native线程对象
      OSThread* osthread = thread->osthread();
    
      if (!osthread->interrupted()) {
        osthread->set_interrupted(true);
       //内存屏障,使osthread的interrupted状态对其它线程立即可见
        OrderAccess::fence();
        //前文说过,_SleepEvent用于Thread.sleep,线程调用了sleep方法,则通过unpark唤醒
        ParkEvent * const slp = thread->_SleepEvent ;
        if (slp != NULL) slp->unpark() ;
      }
    
      //_parker用于concurrent相关的锁,此处同样通过unpark唤醒
      if (thread->is_Java_thread())
        ((JavaThread*)thread)->parker()->unpark();
      //synchronized同步块和Object.wait() 唤醒
      ParkEvent * ev = thread->_ParkEvent ;
      if (ev != NULL) ev->unpark() ;
    
    }
    

    由此可见,interrupt其实就是通过ParkEvent的unpark方法唤醒对象;另外要注意:

    • object.wait、Thread.sleep和Thread.join会抛出InterruptedException并清除中断状态;
    • Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,区别在于park()等待被唤醒时lock会继续执行park()来等待锁,而 lockInterruptibly会抛出异常;
    • synchronized被唤醒后会尝试获取锁,失败则会通过循环继续park()等待,因此实际上是不会被interrupt()中断的;
    • 一般情况下,抛出异常时,会清空Thread的interrupt状态,在编程时需要注意;

    网络相关的中断

    之前的interrupt方法有这么一段:

    private volatile Interruptible blocker;
    private final Object blockerLock = new Object();
    synchronized (blockerLock) {
                Interruptible b = blocker;
                if (b != null) {
                    interrupt0();           // Just to set the interrupt flag
                    b.interrupt(this);
                    return;
                }
            }
    

    其中blocker是Thread的成员变量,Thread提供了blockedOn方法可以设置blocker:

     void blockedOn(Interruptible b) {
            synchronized (blockerLock) {
                blocker = b;
            }
        }
    

    如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:

     protected final void begin() {
            if (interruptor == null) {
                interruptor = new Interruptible() {
                        public void interrupt(Thread target) {
                            synchronized (closeLock) {
                                if (!open)
                                    return;
                                open = false;
                                interrupted = target;
                                try {
                                    AbstractInterruptibleChannel.this.implCloseChannel();
                                } catch (IOException x) { }
                            }
                        }};
            }
            blockedOn(interruptor);//设置当前线程的blocker为interruptor
            Thread me = Thread.currentThread();
            if (me.isInterrupted())
                interruptor.interrupt(me);
        }
    
     protected final void end(boolean completed)
            throws AsynchronousCloseException
        {
            blockedOn(null);//设置当前线程的blocker为null
            Thread interrupted = this.interrupted;
           //如果发生中断,Thread.interrupt方法会调用Interruptible的interrupt方法,
          //设置this.interrupted为当前线程
            if (interrupted != null && interrupted == Thread.currentThread()) {
                interrupted = null;
                throw new ClosedByInterruptException();
            }
            if (!completed && !open)
                throw new AsynchronousCloseException();
        }
    
    //Class java.nio.channels.Channels.WritableByteChannelImpl
         public int write(ByteBuffer src) throws IOException {
            ......    
            try {
                begin();
                out.write(buf, 0, bytesToWrite);
            finally {
                end(bytesToWrite > 0);
            }
            ......
        }
    
    //Class java.nio.channels.Channels.ReadableByteChannelImpl
        public int read(ByteBuffer dst) throws IOException {
            ......    
            try {
                begin();
                bytesRead = in.read(buf, 0, bytesToRead);
            finally {
                end(bytesRead > 0);
            }
            ......
        }
    

    以上述代码为例,nio通道的ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上。当线程中断时,Thread.interrupt()触发回调接口Interruptible关闭io通道,导致read方法返回,最后在finally块中执行end()方法检查中断标记,抛出ClosedByInterruptException;

    Selector的实现类似:

    //java.nio.channels.spi.AbstractSelector
      protected final void begin() {
            if (interruptor == null) {
                interruptor = new Interruptible() {
                        public void interrupt(Thread ignore) {
                            AbstractSelector.this.wakeup();
                        }};
            }
            AbstractInterruptibleChannel.blockedOn(interruptor);
            Thread me = Thread.currentThread();
            if (me.isInterrupted())
                interruptor.interrupt(me);
        }
    protected final void end() {
        AbstractInterruptibleChannel.blockedOn(null);
    }
    //sun.nio.ch.class EPollSelectorImpl
    protected int doSelect(long timeout) throws IOException {
            ......
            try {
                begin();
                pollWrapper.poll(timeout);
            } finally {
                end();
            }
            ......
        }
    

    可以看到当发生中断时会调用wakeup方法唤醒poll方法,但并不会抛出中断异常;

    相关文章

      网友评论

      • 夜月行者:您好,不知道怎么称呼,真心感谢您的这些文章,从中学到很多东西。
        一直想打通java线程和操作系统线程的运行时对应关系,找了好久没有找到,在您这里找到了,真心感谢!

        同时想再请教一个问题,有可能是我看的不够仔细,像 object.wait thread.sleep 能够抛出 InterruptException主要是因为 object.wait 和thread.sleep的实现检查状态位然后抛出了这个异常。
        thread.interrupt只是将这种阻塞态的线程唤醒而已,是这个样子么(这个是我结合你讲的thread sleep源码猜测的,我查阅了一些object.wait的源码没有找到相关的处理),麻烦您了。

      本文标题:Java线程源码解析之interrupt

      本文链接:https://www.haomeiwen.com/subject/ieinmttx.html