美文网首页小白架构师之路
Nety线程模型2(Accept篇)

Nety线程模型2(Accept篇)

作者: linking12 | 来源:发表于2016-08-22 17:49 被阅读239次

    概述

    Netty的线程模型没什么出彩的地方,旧瓶装新酒,其就是基于Reactor模式

    Reactor模式结构

    首先用一张许多人都看过的图来开始说明Reactor模式

    Netty Reactor.jpeg

    这张图估计在许多博客和帖子都会看到,但是许多博客却没有详细说明及解释这张图在netty的架构上反应出来

    • Reactor模式角色定义
      1:MainReactor: 负责响应client的连接请求,并建立连接,可以有1个或者多个,维护一个独立的NIO Selector
      2:SubReactor: 负责对client的读写请求进行处理,可以1个或者多个,并也维护一个独立的NIO Selector
      3:Acceptor: 负责MainReactor和SubReactor的桥梁左右,已经准备好的连接转发到SubReactor中进行处理

    • Netty中Reactor模式角色定义
      1:MainReactorEventLoopGroup,在Netty4以前叫做BossGroup;
      2:SubReactor:EventLoopGroup, 在Netty4以前叫做WorkGroup;
      3:Acceptor:ServerBootstrapAcceptor:一个系统自带的ChannelInboundHandler事件拦截器,真正的将已准备好的Channel注册到SubReactor中;

    SubReactor:EventLoopGroup

    subReactor的任务比较简单,接收Acceptor的Channel,后将Channel重新进行注册,并触发自定义的Handler来处理逻辑
    1)接收Acceptor传递过来的Channel通道
    2)注册到相应的selector。

    private void register0(ChannelPromise promise) {
                try {
                    doRegister();
                    registered = true;
                    promise.setSuccess();
                    pipeline.fireChannelRegistered();
                    if (isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                }
            }
    

    3)调用eventLoop.execute用以执行注册任务
    4)启动子线程。即启动了subReactor

    注意:pipeline.fireChannelRegistered()触发的事件,其实现原理就是初始化不同的ChannelInitializer对象,对不同类型的Channel添加不同的拦截处理

    • 启动时,Channel是NioServerSocketChannel,调用的是ServerBootstrapAcceptor
    • 连接时,Channel是NioSocketChannel,调用的是用户自定义的InboundHandler

    例如:

     ServerBootstrap.childHandler(new GearmanServerInitializer())//此用户自定义的的ChannelInitializer
    
    public class GearmanServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            pipeline.addLast("decoder", new Decoder());
            pipeline.addLast("encoder", new Encoder());
            pipeline.addLast("handler", new PacketHandler(networkManager));
        }
    }
    
    Acceptor:ServerBootstrapAcceptor
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                Channel child = (Channel) msg;
                child.pipeline().addLast(childHandler);
                // ...... 省略无关代码
                try {
                       childGroup.register(child);
                } catch (Throwable t) {
                  // ......省略无关代码
                }
            }
    

    说明:
    当Channel已经Ready后,就会ServerBootstrapAcceptor#channelRead
    1:首先把用户自定义的handler注册到pipleline中
    2:将已准备好的Channel与childGroup,触发点就是childGroup.register(child);

    如果看了上篇文章,会发现两者都是存在注册通道的原理,其实是不同的

    1. 在server启动时,通过回调bind的监听会把Selector注册事件改为electionKey.OP_ACCEPT
    2. 而当有连接进来的时候,通过重新注册又把Selector注册事件改为了0
      在这一点有点劳民伤财的味道。(其实不是,在下面会专门有注意点提到这点)
    MainReactor:EventLoopGroup

    如果看了上篇文章,应该知道在server启动时,会启动MainReactor,一直循环执行IO任务和非IO任务

     protected void run() {
            for (;;) {
                oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select();
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
                    cancelledKeys = 0;
                    final long ioStartTime = System.nanoTime();
                    needsToSelectAgain = false;
                    if (selectedKeys != null) {
                        processSelectedKeysOptimized(selectedKeys.flip());
                    } else {
                        processSelectedKeysPlain(selector.selectedKeys());
                    }
                    final long ioTime = System.nanoTime() - ioStartTime;
                    final int ioRatio = this.ioRatio;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    //..........省略无关代码
                } catch (Throwable t) {
                   //...........省略无关代码
                }
            }
        }
    

    在上篇文章主要提的是runAllTasks这个方法,主要执行非IO任务
    这里主要是来说明下IO任务,selectedKeys不为空
    if (selectedKeys != null) {
    processSelectedKeysOptimized(selectedKeys.flip());
    } else {
    processSelectedKeysPlain(selector.selectedKeys());
    }
    执行processSelectedKeysOptimized(selectedKeys.flip());

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
            for (int i = 0;; i ++) {
                final SelectionKey k = selectedKeys[i];
                if (k == null) {
                    break;
                }
                final Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = this.selectedKeys.flip();
                    i = -1;
                }
            }
        }
    

    注意的地方就是 final Object a = k.attachment();这个attachment是从哪里来的,看如下selectionKey =javaChannel().register(eventLoop().selector, 0, this);

      protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                   
                }
            }
        }
    
    • 对于当server启动时,由于该当前对象是NioServerSocketChannel
    • 对于当连接进来是,由于当前对象是NioSocketChannel

    在注册整个Selector选择器的时候,把当前通道(Channel)也注册进去了,上面那个劳民伤财其实是句玩笑,在这里体现出两次注册的用意来了

    继续以上processSelectedKeysOptimized,其中processSelectedKey就是处理网络Io事件,把该事件发给Acceptor的主要触发点,而有点要

    代码如下:

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        
            int readyOps = -1;
            try {
                readyOps = k.readyOps();
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        return;
                    }
                }
                //........省略无关代码
            } catch (CancelledKeyException e) {
               //.......省略无关代码
            }
        }
    

    而组装channel,bytebuffer等网络数据是在NioMessageUnsafe#read()中

            public void read() {
                //... 省略无关代码
                try {
                    for (;;) {
                        int localRead = doReadMessages(readBuf);
                        // .......省略无关代码
                    }
                } catch (Throwable t) {
                }
                for (int i = 0; i < readBuf.size(); i ++) {
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                pipeline.fireChannelReadComplete();
                 // ..... 省略无关代码
            }
        }
    
    • doReadMessages(readBuf);
      把当前请求连接封装成一个Channel(其实就是NioSocketChannel)
    • pipeline.fireChannelRead(readBuf.get(i));
      通知Acceptor来读取,其实就是通知ServerBootstrapAcceptor#channelRead

    相关文章

      网友评论

        本文标题:Nety线程模型2(Accept篇)

        本文链接:https://www.haomeiwen.com/subject/axfgsttx.html