美文网首页程序猿之路
NIO源码分析(三)

NIO源码分析(三)

作者: 三斤牛肉 | 来源:发表于2017-05-03 20:41 被阅读301次

    解释下Channel中accept,read,write中的begin/end函数做了什么

    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread target) {
                        synchronized (closeLock) {
                            if (!open)
                                return;
                            open = false;
                            interrupted = target;
                            try {
    //这里关闭了当前的channel
                              AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) { }
                        }
                    }};
        }
        blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);//如果当前线程被中断,则优雅的关闭channel
    }
    

    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        blockedOn(null);
        Thread interrupted = this.interrupted;
    //如果当前线程被中断,则抛出中断关闭异常,即当前线程被其他线程调用interrupt中断
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
    //如果通讯未完成且channel已经关闭,抛出异步关闭异常,即当前channel被其他线程调用close的时候
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }
    

    单步跟踪blockedOn()函数可以得到
    其实等价于
    Thread.currentThread().blockedOn(interruptor);
    由于blockedOn这个函数是protected的,所以需要绕一大圈

    private volatile Interruptible blocker;  
    
    void blockedOn(Interruptible b) {
        synchronized (blockerLock) {
            blocker = b;
        }
    }
    

    这个blocker又干了啥呢,搜索Thead类全文:

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
    
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);//看到了吗,当线程执行interrupt的时候就会去执行上面的interruptor类了
                return;
            }
        }
        interrupt0();
    }
    

    所以总结下begin的作用是,给当前的thread设置一个blocker使之可以在线程被中断时优雅的关闭channel,并且最后检测一遍currentThread是否被中断
    end的作用是取消begin设置的blocker,并对线程中断/channel关闭等状态抛出相应异常


    同样在SelectorImpl中也有begin/end对

    protected int doSelect(long timeout)
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        ...
    }
    

    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread ignore) {
                        AbstractSelector.this.wakeup();//这里是唤醒当前selector
                    }};
        }
        AbstractInterruptibleChannel.blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }
    

    end就不贴了,是一样的
    wakeup做了什么?

    public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {//interruptTriggered默认是false的,当close的时候置为true(具体看代码)
                pollWrapper.interrupt();//这里其实调用了native的interrupt(outgoingInterruptFD)方法
    //这里中断了write的fd
                interruptTriggered = true;
            }
        }
        return this;
    }
    

    具体看pollWrapper.interrupt()怎么作用的

    EPollSelectorImpl(SelectorProvider sp) {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        pollWrapper = new EPollArrayWrapper();
      //其实在selector构造函数的时候就已经初始化interrupt
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<Integer,SelectionKeyImpl>();
    }
    
    EPollArrayWrapper:
    void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;//写通道
        incomingInterruptFD = fd0;//读通道
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);//添加fd0兴趣事件为可读(EPOLLIN)
    }
    

    查看EPollArrayWrapper.c

    JNIEXPORT void JNICALL
    Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
    {
       int fakebuf[1];
        fakebuf[0] = 1;
    //可以看到这里往fd1写入了一个字节,使写通道fd0变成readable状态,selector因为有事件就绪而中止阻塞(上面的epollCtl已经注册了fd0的read事件)
        if (write(fd, fakebuf, 1) < 0) {
            JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
        }
    }
    

    总结selectorImpl的begin同样也设置了一个blocker,
    当this.selector.select()阻塞时,要怎么中断呢,
    selector在初始化的时候会先给自己分配2个用于中断的fd,一个读,一个写,然后给读fd注册一个感兴趣的读事件,这样在中断时只要给写fd写入一个字节,那么读事件就会感知到,使得selector被唤醒。

    相关文章

      网友评论

        本文标题:NIO源码分析(三)

        本文链接:https://www.haomeiwen.com/subject/aryltxtx.html