美文网首页
netty accept事件处理

netty accept事件处理

作者: binecy | 来源:发表于2018-01-03 17:19 被阅读88次

    源码分析基于netty 4

    前面已经说了netty启动过程。接着讲讲netty对accept事件的处理

    前面说过,NioEventLoop的run方法正是loop的核心
    run()比较复杂, 涉及wakenUp,ioRatio等内容, 抽取主要流程如下:

    for (;;) {
        try {
            try {
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                runAllTasks();
            }
        } catch (Throwable t) {
                    handleLoopException(t);
                }                
    }    
    

    processSelectedKeys处理io事件

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    一般会走processSelectedKeysOptimized逻辑

    NioEventLoop中的selectedKeys是netty自定义的SelectedSelectionKeySet,继承自AbstractSet,里面有两个SelectionKey数组, add时会添加key到其中一个数组, flip方法会取出当前存储的数组, 并修改存储标示使add添加到另一数组。

    NioEventLoop中聚合了java.nio.channels.Selector。注意openSelector方法,通过反射, 将SelectedSelectionKeySet设置到Selector的selectedKeys属性, 这样jdk底层产生的selectedKey会add到SelectedSelectionKeySet中

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);
    
        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);
    

    回到processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            
    
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                // 网络事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                // 
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
        }
    }
    

    网络事件会触发processSelectedKey(SelectionKey k, AbstractNioChannel ch):

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
    
            unsafe.finishConnect();
        }
    
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
    
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    }
    

    可以看到这里netty对各种网络事件做了处理

    这里只关注OP_READ和OP_ACCEPT事件,来看对应的处理方法,主要是unsafe.read()方法(虽然方法名为read,但netty将它作为基本逻辑处理抽象,它也可以处理其他事件,如accept),该方法是抽象方法,有两个实现类:NioByteUnsafe和AbstractNioMessageChannel

    回顾前面说得的启动过程,channel注册时,NioServerSocketChannel注册了eventloop.selector的OP_READ事件, 所以当OP_READ发生时,会调用到NioMessageUnsafe.read(NioServerSocketChannel继承自AbstractNioMessageChannel)

    public void read() {
        ...
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        do {
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            
            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 触发read事件
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 触发readComplete事件
        pipeline.fireChannelReadComplete();
    }   
    

    这里主要是通过模板方法doReadMessages, 将readBuf交给子类处理, 所以来看看NioServerSocketChannel实现的doReadMessages方法:

    SocketChannel ch = javaChannel().accept();
    
    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    

    比较简单, 就是把accpet的java.nio.channels.SocketChannel封装为netty的NioSocketChannel对象
    值得注意的是, NioSocketChannel的构造过程中, 会调用到父类AbstractNioByteChannel如下构造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
    

    主要之前说过的readInterestOp属性,这里NioSocketChannel关注的事件是OP_READ

    接下来,会触发pipeline事件,
    这里的pipeline.fireChannelRead事件比较重要(pipeline机制后面会讲), 他会触发ServerBootstrapAcceptor.channelRead方法(前面说过了, channl注册时会把ServerBootstrapAcceptor添加到pipeline中):

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        // 添加用户定义的Handler到pipeline
        child.pipeline().addLast(childHandler);
        
        // 注册NioSocketChannel到childGroup
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    }
    

    这里把NioSocketChannel注册到EventLoop中, 注册过程前面说过了。
    要注意的是,这里注册的loop是childGroup,注册的事件是OP_READ。到这里,childGroup总算启动了。

    相关文章

      网友评论

          本文标题:netty accept事件处理

          本文链接:https://www.haomeiwen.com/subject/fdfknxtx.html