美文网首页小白架构师之路
Netty线程模型1(Bind篇)

Netty线程模型1(Bind篇)

作者: linking12 | 来源:发表于2016-08-22 00:05 被阅读766次

    Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
    作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建
    本人从源码的角度来分析及讨论下Netty,抛砖引玉

    原生的NIO服务端
    public class NIOServer {
        //通道管理器
        private Selector selector;
        public void initServer(int port) throws IOException {
            // 获得一个ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 设置通道为非阻塞
            serverChannel.configureBlocking(false);
            // 将该通道对应的ServerSocket绑定到port端口
            serverChannel.socket().bind(new InetSocketAddress(port));
            // 获得一个通道管理器
            this.selector = Selector.open();
            //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
            //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        public static void main(String[] args) throws IOException {
            NIOServer server = new NIOServer();
            server.initServer(8000);
        }
    }
    
    

    从原生的Nio启动逻辑来看
    如果要实现Nio的服务端必须经历以下阶段:
    Channel.open->configureBlocking(false)->bind->Selctor.open()->register
    而Netty也不列外,但是他是怎么做的呢?

    Netty的NIO服务端
       // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new EchoServerHandler());
                 }
             });
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    
    启动状态图
    Netty Nio Server Start.png

    Netty的线程模型就是多线程的Reactor模式,有MainReactor和SubReactor及Acceptor三种角色来组成其线程模型(下一章节将详细介绍此模型)
    其中MainReactor和SubReactor都体现在EventLoopGroup这个线程组中,启动主要也是MainReactor绑定线程进行启动,本文以NIO为例详细说明启动状态图的一些细节,主要讲解MainReactor的角色定位及状态流转

    NioEventLoopGroup的实现及扭转

    NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法
    其经历
    1.AbstractBootstrap.bind(port)
    2.AbstractBootstrap.initAndRegister()
    3.ServerBootstrap.init(Channel channel)
    4.NioEventLoopGroup.register(Channel channel, ChannelPromise promise)
    其中2和3都是初始化整个Channel出来为后面的4阶段进行做准备的,而4阶段是将Channel与原生的Nio的ServerSocketChannel进行映射并实现Nio服务端的打开通道,设置非阻塞,在通道上设置事件选择器这个主要工作
    所以主要启动逻辑在4阶段
    跟随代码逻辑会发现最后会调用到AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)中,其主要代码如下:

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
               //......省略无关代码
                AbstractChannel.this.eventLoop = eventLoop;
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                         // ...... 省略无关代码
                    }
                }
            }
    

    eventLoop.inEventLoop()是判断当前线程是否为EventLoop线程, 此时当前线程还是我们的main线程, bossEventLoop线程还没有启动,所以会走到else分支调用eventLoop.execute(),在SingleThreadEventExecutor的execute方法中会判断当前线程是否为eventLoop如果不是, 则启动当前eventLoop线程,如下

     @Override
        public void execute(Runnable task) {
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread(); //启动NioEventLoop线程
                addTask(task); //将当前的注册任务添加到延迟队列中
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
            if (!addTaskWakesUp) {
                wakeup(inEventLoop);
            }
        }
    

    所以主要是启动NioEventLoop线程
    当走到此时,整个NioEventLoop线程已经启动,启动主要执行run()方法,如下:

    protected void run() {
            for (;;) {
                    // ......省略无关代码
                    if (selectedKeys != null) {
                        //执行IO任务
                        processSelectedKeysOptimized(selectedKeys.flip()); 
                    } 
                    final long ioTime = System.nanoTime() - ioStartTime;
                    final int ioRatio = this.ioRatio;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //执行非IO任务,如注册通道、绑定端口等,与Io任务的时间轮占比是1:1,可配置
                    // .....省略无关代码
                } catch (Throwable t) {
                    // .....省略无关代码
                }
            }
        }
    

    通过了这一步,整个Boss线程就已经启动起来,该线程主要执行了几类任务

    1. 典型的Io任务,由processSelectedKeysOptimized触发,本阶段不会触发
    2. 注册通道的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
    3. 绑定端口的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
    4. 通知通道注册成功的任务,也是由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发

    其中2.3.4都是通过从延迟队列中拉取出来执行
    2任务主要是通道注册并会触发3.4任务

    private void register0(ChannelPromise promise) {
                try {
                    doRegister();
                    registered = true;
                    promise.setSuccess();//触发回调来将绑定端口的任务提交到队列,3任务
                    pipeline.fireChannelRegistered();//通知注册成功,4任务
                    if (isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                }
            }
    

    详细可以跟随启动状态图进行查看源码

    在整个里面并没有对Netty与原生的Nio的流程怎么对应起来进行说明

    • channel(NioServerSocketChannel.class)
      设置成new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)
      通过反射创建一个NioServerSocketChannel对象,通过无参构造函数创建Nio
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
        private final Class<? extends T> clazz;
        @Override
        public T newChannel() {
            try {
                return clazz.newInstance(); //反射调用无参构造,如下
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    }
     public NioServerSocketChannel() {
            super(null, newSocket(), SelectionKey.OP_ACCEPT); //调用父类设置为无阻塞通道
            config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
        }
       private static ServerSocketChannel newSocket() {
            try {
                return ServerSocketChannel.open();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    • AbstractNioChannel#doRegister先把监听事件注册为0, 在注册完之后的修改为OP_ACCEPT
    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    //........
                }
            }
        }
    
    • DefaultChannelPipeline#fireChannelActive,注意以下read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思,
     public ChannelPipeline fireChannelActive() {
            head.fireChannelActive();
            if (channel.config().isAutoRead()) {
                channel.read();  //注意这个read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思, 这个event在pipeline中从tail开始溜达最终会溜达到head的read()方法
            }
            return this;
        }
       public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
       }
    protected void doBeginRead() throws Exception {
       
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) { // 这里把监听事件从0改为了ACCEPT
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    相关文章

      网友评论

      本文标题:Netty线程模型1(Bind篇)

      本文链接:https://www.haomeiwen.com/subject/gkkurttx.html