美文网首页Java World即时通讯netty
netty源码分析之揭开reactor线程的面纱(二)

netty源码分析之揭开reactor线程的面纱(二)

作者: 简书闪电侠 | 来源:发表于2016-11-28 07:08 被阅读3809次

    如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下

    reactor线程

    我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节

    我们进入到reactor线程的 run 方法,找到处理IO事件的代码,如下

    processSelectedKeys();
    

    跟进去

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    我们发现处理IO事件,netty有两种选择,从名字上看,一种是处理优化过的selectedKeys,一种是正常的处理

    我们对优化过的selectedKeys的处理稍微展开一下,看看netty是如何优化的,我们查看 selectedKeys 被引用过的地方,有如下代码

    private SelectedSelectionKeySet selectedKeys;
    
    private Selector NioEventLoop.openSelector() {
        //...
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        // selectorImplClass -> sun.nio.ch.SelectorImpl
        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);
        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);
        //...
        selectedKeys = selectedKeySet;
    }
    

    首先,selectedKeys是一个 SelectedSelectionKeySet 类对象,在NioEventLoopopenSelector 方法中创建,之后就通过反射将selectedKeys与 sun.nio.ch.SelectorImpl 中的两个field绑定

    sun.nio.ch.SelectorImpl 中我们可以看到,这两个field其实是两个HashSet

    // Public views of the key sets
    private Set<SelectionKey> publicKeys;             // Immutable
    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
    protected SelectorImpl(SelectorProvider sp) {
        super(sp);
        keys = new HashSet<SelectionKey>();
        selectedKeys = new HashSet<SelectionKey>();
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            publicKeys = Collections.unmodifiableSet(keys);
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
    

    selector在调用select()族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的selectionKey(具体怎么塞有待研究),即相当于往一个hashSet中add元素,既然netty通过反射将jdk中的两个field替换掉,那我们就应该意识到是不是netty自定义的SelectedSelectionKeySetadd方法做了某些优化呢?

    带着这个疑问,我们进入到 SelectedSelectionKeySet 类中探个究竟

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        private SelectionKey[] keysA;
        private int keysASize;
        private SelectionKey[] keysB;
        private int keysBSize;
        private boolean isA = true;
    
        SelectedSelectionKeySet() {
            keysA = new SelectionKey[1024];
            keysB = keysA.clone();
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            if (isA) {
                int size = keysASize;
                keysA[size ++] = o;
                keysASize = size;
                if (size == keysA.length) {
                    doubleCapacityA();
                }
            } else {
                int size = keysBSize;
                keysB[size ++] = o;
                keysBSize = size;
                if (size == keysB.length) {
                    doubleCapacityB();
                }
            }
    
            return true;
        }
    
        private void doubleCapacityA() {
            SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
            System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
            keysA = newKeysA;
        }
    
        private void doubleCapacityB() {
            SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
            System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
            keysB = newKeysB;
        }
    
        SelectionKey[] flip() {
            if (isA) {
                isA = false;
                keysA[keysASize] = null;
                keysBSize = 0;
                return keysA;
            } else {
                isA = true;
                keysB[keysBSize] = null;
                keysASize = 0;
                return keysB;
            }
        }
    
        @Override
        public int size() {
            if (isA) {
                return keysASize;
            } else {
                return keysBSize;
            }
        }
    
        @Override
        public boolean remove(Object o) {
            return false;
        }
    
        @Override
        public boolean contains(Object o) {
            return false;
        }
    
        @Override
        public Iterator<SelectionKey> iterator() {
            throw new UnsupportedOperationException();
        }
    }
    

    该类其实很简单,继承了 AbstractSet,说明该类可以当作一个set来用,但是底层使用两个数组来交替使用,在add方法中,判断当前使用哪个数组,找到对应的数组,然后经历下面三个步骤
    1.将SelectionKey塞到该数组的逻辑尾部
    2.更新该数组的逻辑长度+1
    3.如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容

    我们可以看到,待程序跑过一段时间,等数组的长度足够长,每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将 SelectionKey 塞到 set中去,而jdk底层使用的hashSet需要O(lgn)的时间复杂度

    这里关于为何使用两个数组循环交替使用,其实我也是很费解,思考了很久,查找SelectedSelectionKeySet 所有使用的地方,我觉得使用一个数组就能够达到优化目的,并且不用每次都判断使用哪个数组,所以对于该问题,我提了一个issue给netty官方,官方也给出了答复说会跟进,issue链接:https://github.com/netty/netty/issues/6058#,目前在4.1.9.Final 版本中,netty已经将SelectedSelectionKeySet.java 底层使用一个数组了,链接

    关于netty对SelectionKeySet的优化我们暂时就跟这么多,下面我们继续跟netty对IO事件的处理,转到processSelectedKeysOptimized

     private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
         for (int i = 0;; i ++) {
             // 1.取出IO事件以及对应的channel
             final SelectionKey k = selectedKeys[i];
             if (k == null) {
                 break;
             }
             selectedKeys[i] = null;
             final Object a = k.attachment();
             // 2.处理该channel
             if (a instanceof AbstractNioChannel) {
                 processSelectedKey(k, (AbstractNioChannel) a);
             } else {
                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                 processSelectedKey(k, task);
             }
             // 3.判断是否该再来次轮询
             if (needsToSelectAgain) {
                 for (;;) {
                     i++;
                     if (selectedKeys[i] == null) {
                         break;
                     }
                     selectedKeys[i] = null;
                 }
                 selectAgain();
                 selectedKeys = this.selectedKeys.flip();
                 i = -1;
             }
         }
     }
    

    我们可以将该过程分为以下三个步骤

    1.取出IO事件以及对应的netty channel类

    这里其实也能体会到优化过的 SelectedSelectionKeySet 的好处,遍历的时候遍历的是数组,相对jdk原生的HashSet效率有所提高

    拿到当前SelectionKey之后,将selectedKeys[i]置为null,这里简单解释一下这么做的理由:想象一下这种场景,假设一个NioEventLoop平均每次轮询出N个IO事件,高峰期轮询出3N个事件,那么selectedKeys的物理长度要大于等于3N,如果每次处理这些key,不置selectedKeys[i]为空,那么高峰期一过,这些保存在数组尾部的selectedKeys[i]对应的SelectionKey将一直无法被回收,SelectionKey对应的对象可能不大,但是要知道,它可是有attachment的,这里的attachment具体是什么下面会讲到,但是有一点我们必须清楚,attachment可能很大,这样一来,这些元素是GC root可达的,很容易造成gc不掉,内存泄漏就发生了

    这个bug在 4.0.19.Final版本中被修复,建议使用netty的项目升级到最新版本^^

    2.处理该channel

    拿到对应的attachment之后,netty做了如下判断

    if (a instanceof AbstractNioChannel) {
        processSelectedKey(k, (AbstractNioChannel) a);
    } 
    

    源码读到这,我们需要思考为啥会有这么一条判断,凭什么说attachment可能会是 AbstractNioChannel对象?

    我们的思路应该是找到底层selector, 然后在selector调用register方法的时候,看一下注册到selector上的对象到底是什么鬼,我们使用intellij的全局搜索引用功能,最终在 AbstractNioChannel中搜索到如下方法

    protected void doRegister() throws Exception {
        // ...
        selectionKey = javaChannel().register(eventLoop().selector, 0, this);
        // ...
    }
    

    javaChannel() 返回netty类AbstractChannel对应的jdk底层channel对象

    protected SelectableChannel javaChannel() {
        return ch;
    }
    

    我们查看到SelectableChannel方法,结合netty的 doRegister() 方法,我们不难推论出,netty的轮询注册机制其实是将AbstractNioChannel内部的jdk类SelectableChannel对象注册到jdk类Selctor对象上去,并且将AbstractNioChannel作为SelectableChannel对象的一个attachment附属上,这样再jdk轮询出某条SelectableChannel有IO事件发生时,就可以直接取出AbstractNioChannel进行后续操作

    下面是jdk中的register方法

         //*
         //* @param  sel
         //*         The selector with which this channel is to be registered
         //*
         //* @param  ops
         //*         The interest set for the resulting key
         //*
         //* @param  att
         //*         The attachment for the resulting key; may be <tt>null</tt>
    public abstract SelectionKey register(Selector sel, int ops, Object att)
            throws ClosedChannelException;
    

    由于篇幅原因,详细的 processSelectedKey(SelectionKey k, AbstractNioChannel ch) 过程我们单独写一篇文章来详细展开,这里就简单说一下
    1.对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
    2.对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理

    上面处理attachment的时候,还有个else分支,我们也来分析一下
    else部分的代码如下

    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
    

    说明注册到selctor上的attachment还有另外一中类型,就是 NioTask,NioTask主要是用于当一个 SelectableChannel 注册到selector的时候,执行一些任务

    NioTask的定义

    public interface NioTask<C extends SelectableChannel> {
        void channelReady(C ch, SelectionKey key) throws Exception;
        void channelUnregistered(C ch, Throwable cause) throws Exception;
    }
    

    由于NioTask 在netty内部没有使用的地方,这里不过多展开

    3.判断是否该再来次轮询

    if (needsToSelectAgain) {
        for (;;) {
            i++;
            if (selectedKeys[i] == null) {
                break;
            }
            selectedKeys[i] = null;
        }
        selectAgain();
        selectedKeys = this.selectedKeys.flip();
        i = -1;
    }
    

    我们回忆一下netty的reactor线程经历前两个步骤,分别是抓取产生过的IO事件以及处理IO事件,每次在抓到IO事件之后,都会将 needsToSelectAgain 重置为false,那么什么时候needsToSelectAgain会重新被设置成true呢?

    还是和前面一样的思路,我们使用intellij来帮助我们查看needsToSelectAgain被使用的地方,在NioEventLoop类中,只有下面一处将needsToSelectAgain设置为true

    NioEventLoop.java

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
    

    继续查看 cancel 函数被调用的地方

    AbstractChannel.java

    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }
    

    不难看出,在channel从selector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达 CLEANUP_INTERVAL 的时候,设置needsToSelectAgain为true,CLEANUP_INTERVAL默认值为256

    private static final int CLEANUP_INTERVAL = 256;
    

    也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码

    if (needsToSelectAgain) {
        for (;;) {
            i++;
            if (selectedKeys[i] == null) {
                break;
            }
            selectedKeys[i] = null;
        }
        selectAgain();
        selectedKeys = this.selectedKeys.flip();
        i = -1;
    }
    

    每满256次,就会进入到if的代码块,首先,将selectedKeys的内部数组全部清空,方便被jvm垃圾回收,然后重新调用selectAgain重新填装一下 selectionKey

    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }
    

    netty这么做的目的我想应该是每隔256次channel断线,重新清理一下selectionKey,保证现存的SelectionKey及时有效

    到这里,我们初次阅读源码的时候对reactor的第二个步骤的了解已经足够了。总结一下:netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法

    下一篇文章,我们将一起来看下netty中reactor线程中最后一步,runTasks,你将了解到netty中异步执行任务机制的细节,尽请期待

    最后,仅在简书博客的最后,介绍一下今天新开的知识星球,希望能给你的源码之旅多一点陪伴和指导。

    1.每天会回答大家有关netty网络编程方面的问题,如果你有困惑,尽管提出来,每一条我都会回答
    2.不定期分享netty以及程序员效率相关的知识,确保每条知识对你都受用
    3.星球试运营不做推广,目前所有的用户都来自我的简书博客,志同道合的人在一起,擦出不一样的火花
    4.一个人的学习是寂寞的,低效的。不断的与交流,思考,并适当的接受指导和挑战,这样的学习才会有乐趣
    5.最后,透露一下,星球第一条知识可以给你免费的惊喜

    image.png

    相关文章

      网友评论

      • 鋒Nic:selector在调用select()方法的时候如果有IO事件发生就会往里面的两个field中塞相应的selectionKey,具体怎么塞我跟了下select()方法是按照如下的方式塞的吧?
        if(var9.clearedCount != var1) {
        var10.channel.translateAndSetReadyOps(var4, var10);
        if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
        WindowsSelectorImpl.this.selectedKeys.add(var10);
        var9.updateCount = var1;
        ++var6;
        }
        } else {
        var10.channel.translateAndUpdateReadyOps(var4, var10);
        if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
        WindowsSelectorImpl.this.selectedKeys.add(var10);
        var9.updateCount = var1;
        ++var6;
        }
        }
        简书闪电侠:某个地方调用add方法
        鋒Nic:@简书闪电侠 那Mac这段是怎么塞到那两个field的
        简书闪电侠:@鋒Nic 这个是window上的哇,mac
        貌似是这段
        int poll(long timeout) {
        updateRegistrations();
        int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);
        return updated;
        }
        private native int kevent0(int kq, long keventAddress, int keventCount,
        long timeout);

        走到native下去了
      • 鋒Nic:嗨,doDeregister()在channel从selector上移除的时候调用cancel()方法取消selectionKey并且当被去掉的key到达 CLEANUP_INTERVAL 的时候设置needsToSelectAgain为true,这里并没有去清空seletionKeys数组,而是挪到processSelectedKeys()方法做清理,这边能把清理的工作放到cancel()方法实现吗?感觉这样合理~
      • 快乐王子_ea4e:这里并不对,doRegister中,虽然channel注册了Selector,但是注册的事件是0,并没有注册相应的Accept事件
        简书闪电侠:这里只是一个预先绑定,并没有注册ACCEPT事件
        详细的注册ACCEPT相关的逻辑可以参考:https://www.jianshu.com/p/c5068caab217
      • 9dd25e6200bc:老铁 5.0版本里面 SelectedSelectionKeySet.java又换回用两个数组了 为啥呢
        9dd25e6200bc:@简书闪电侠 多谢 贴一下网上的说明
        https://link.zhihu.com/?target=https%3A//github.com/netty/netty/issues/4466
        简书闪电侠:@远山_b260 5.0 已经废弃了,并不比4.x新:smile:
      • 2c1554247a50:你好,看到你说这里hashset的事件复杂度时o(lgn),不是很明白,在1.8里hashmap中才支持把table中的链表treeify,这边threshold默认是8。但是hashmap在超出loadfactor时就会扩容,我觉得可能用到treeNode的地方不会很多,基本上访问的时间还是可以按O(1)计算的。而且我看的是4.1.13Final,跟这里贴的不太一样,是直接通过array实现的,不支持remove操作,这样的设置真的会优化么?不太理解
      • 益文的圈:使用两个数组循环交替使用是为了多线程读写分离吧?
        简书闪电侠:@sunEven 可以再仔细阅读阅读源码哈,目前4.1.9.Final 已经使用了一个数组了:https://github.com/netty/netty/blob/netty-4.1.9.Final/transport/src/main/java/io/netty/channel/nio/SelectedSelectionKeySet.java
        益文的圈:同时processSelectedKeysOptimized(selectedKeys.flip());会读另一个数组里的数据
        益文的圈:select操作执行的时候往一个数组里放数据。
      • 大胖木杨:刚接触netty,跟着大神的博客学习 :smile:
        简书闪电侠:@tutoumeiqian 一起学习:smile:
      • 7e7919e0103c:不错不错!
      • lxzh504:期待下一篇。对着源码看的,感觉还是需要看大神的分析。
        简书闪电侠:@李昕哲 不要吝啬你的打赏:sunglasses:
        lxzh504: @the_flash 已关注,谢谢鼓励
        简书闪电侠:@李昕哲 一起学习优秀的源码,可以关注微信公众号哦

      本文标题:netty源码分析之揭开reactor线程的面纱(二)

      本文链接:https://www.haomeiwen.com/subject/fvbnpttx.html