美文网首页
netty系列之(二)——netty服务端启动分析

netty系列之(二)——netty服务端启动分析

作者: 康康不遛猫 | 来源:发表于2018-12-05 14:38 被阅读0次

    一、netty服务启动分析

    EventLoopGroup boss = new NioEventLoopGroup();//类图,继承线程池ScheduledExecutorService
    EventLoopGroup worker = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boss, worker);
    bootstrap.channel(NioServerSocketChannel.class);//利用反射构造NioServerSocketChannel实例
    //backlog指定了内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列:未链接队列和已连接队列,根据TCP三路握手过程中三个分节来分隔这两个队列
    bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.handler(new LoggingServerHandler());//handler与childHandler不同
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new MyChannelHandler1());
            ch.pipeline().addLast(new MyChannelHandler2());
            ch.pipeline().addLast(new MyChannelHandler3());
        }
    });
    ChannelFuture f = bootstrap.bind(port).sync();//bind方法实现
    f.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    //启动成功
                }
    });
    f.channel().closeFuture().sync();
    
    
    图片.png
    创建 ServerBootstrap 实例
    设置并绑定 Reactor 线程池
    设置并绑定服务端 Channel
    创建并初始化 ChannelPipeline
    添加并设置 ChannelHandler
    绑定并启动监听端口
    

    二、netty服务启动代码分析

    1、创建两个EventLoopGroup

      EventLoopGroup bossGroup = new NioEventLoopGroup();
      EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    bossGroup 为 BOSS 线程组,用于服务端接受客户端的连接, workerGroup 为 worker 线程组,用于进行 SocketChannel 的网络读写。当然也可以创建一个线程组,共享使用。

    2、创建ServerBootstrap实例

    ServerBootStrap为Netty服务端的启动引导类,用于帮助用户快速配置、启动服务端服务。提供的方法如下:


    图片.png

    ServerBootStrap底层采用装饰者模式。

    3、设置并绑定Reactor线程池

    b.group(bossGroup, workerGroup)
    

    EventLoopGroup 为 Netty 线程池,它实际上就是 EventLoop 的数组容器。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由绑定的 EventLoop 线程 run 方法驱动,在一个循环体内循环执行。通俗点讲就是一个死循环,不断的检测 I/O 事件、处理 I/O 事件。
    这里设置了两个group,这个其实有点儿像我们工作一样。需要两类型的工人,一个老板(bossGroup),一个工人(workerGroup),老板负责从外面接活,工人则负责死命干活。所以这里 bossGroup 的作用就是不断地接收新的连接,接收之后就丢给 workerGroup 来处理,workerGroup 负责干活就行(负责客户端连接的 IO 操作)。

    4、设置并绑定服务端Channel

    .channel(NioServerSocketChannel.class)
    

    调用 ServerBootstrap.channel 方法用于设置服务端使用的 Channel,传递一个 NioServerSocketChannel Class对象,Netty通过工厂类,利用反射创建NioServerSocketChannel 对象,如下:

        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    
    //最终调用构造函数
    public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    

    5、添加并设置ChannelHandler

     .handler(new LoggingServerHandler())
    .childHandler(new ChannelInitializer(){
        //省略代码
    })
    
    图片.png

    ServerBootstrap 中的 Handler(childHandler()) 是 NioServerSocketChannel 使用的,所有连接该监听端口的客户端都会执行它,父类 AbstractBootstrap 中的 Handler 是一个工厂类,它为每一个新接入的客户端都创建一个新的 Handler。
    handler在server初始化它时就会执行,而childHandler会在客户端成功connect后才执行,这是两者的区别。

    6、绑定端口,启动服务

    ChannelFuture future = b.bind(port).sync();
    

    主要步骤:
    负责创建服务端的NioServerSocketChannel实例;
    为NioServerSocketChannel的pipeline添加handler;
    注册NioServerSocketChannel到selector;

    二、源码详解

    AbstractBootstrap类doBind方法,绑定端口入口

    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister(); //初始化与注册
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
    }
    

    initAndRegister方法,创建和初始化channel

    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel(); //创建服务端Channel
                init(channel);//初始化Channel
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel); //注册selector
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
     }
    
    #channelFactory.newChannel()通过反射创建实例
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Class<? extends T> clazz;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("clazz");
            }
            this.clazz = clazz;
        }
    
        @Override
        public T newChannel() {
            try {
                return clazz.newInstance();
    //clazz由AbstractBootstrap.channel方法传入,bootstrap.channel(NioServerSocketChannel.class);
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    
        @Override
        public String toString() {
            return StringUtil.simpleClassName(clazz) + ".class";
        }
    }
    
    #AbstractBootstrap
    public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    
    #NioServerSocketChannel构造函数
    public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);//调用AbstractNioChannel构造函数
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    
    #AbstractNioChannel构造函数
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);//非阻塞方式
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    #init方法,初始化channel参数,添加handler
       void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {//服务端Channel属性
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();//传入hander
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
                    // In this case the initChannel(...) method will only be called after this method returns. Because
                    // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
                    // placed in front of the ServerBootstrapAcceptor.
            //默认添加的ServerBootstrapAcceptor的hander,连接接入器处理新链接接入时,初始化Options和Attrs
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    register方法,注册selector

    #unsafe类register方法
    //注册到Reactor线程的多路复用器上监听新客户端的接入
    public final void register(final ChannelPromise promise) {
                if (eventLoop.inEventLoop()) {//是否在当前eventLoop中
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {//不在当前eventLoop中,异步执行
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        promise.setFailure(t);
                    }
                }
    }
    
    #unsafe类register0方法
    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!ensureOpen(promise)) {
                        return;
                    }
                    doRegister();
                    registered = true;
                    promise.setSuccess();
                    pipeline.fireChannelRegistered();
                    if (isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    if (!promise.tryFailure(t)) {
                        logger.warn(
                                "Tried to fail the registration promise, but it is complete already. " +
                                        "Swallowing the cause of the registration failure:", t);
                    }
                }
            }
    
    #AbstractNioChannel类doRegister方法
    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                   //获取selectionKey ,通过SelectionKey的interestOps(int ops)方法可以修改监听操作位,注册OP_ACCEPT(16)到多路复用器上
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    附录:
    ChannelOption参数说明

    1、ChannelOption.SO_BACKLOG
        ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,
        服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    2、ChannelOption.SO_REUSEADDR
        ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
        比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,
        比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
        就无法正常使用该端口。
    3、ChannelOption.SO_KEEPALIVE
        Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的
        连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
    4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
        ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区
        的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
    5、ChannelOption.SO_LINGER
        ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证
        会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
    6、ChannelOption.TCP_NODELAY
        ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关
        Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效
        负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送
    

    相关文章

      网友评论

          本文标题:netty系列之(二)——netty服务端启动分析

          本文链接:https://www.haomeiwen.com/subject/bkhanxtx.html