美文网首页程序员
Netty源码愫读(六)ServerBootstrap相关源码学

Netty源码愫读(六)ServerBootstrap相关源码学

作者: 桥头放牛娃 | 来源:发表于2018-12-22 17:31 被阅读44次

    BootStrap在netty的应用程序中负责引导服务器和客户端。netty包含了两种不同类型的引导:

    • 使用服务器的ServerBootStrap,用于接受客户端的连接以及为已接受的连接创建子通道。
    • 用于客户端的BootStrap,不接受新的连接,并且是在父通道类完成一些操作。

    本文主要分析ServerBootstarp相关源码。

    Bootstrap类继承图:

    Bootstrap类继承图.png

    1、ServerBootstarap启动流程分析

    1.1、ServerBootstarap简单启动代码

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);//用于接受accept事件的group serverSocketChannel 
    EventLoopGroup group = new NioEventLoopGroup();  //用于真正读写事件的group socketChannel
    try {
        //create ServerBootstrap instance  
        ServerBootstrap b = new ServerBootstrap();  //启动装载器,用于转载配置
        //Specifies NIO transport, local socket address  
        //Adds handler to channel pipeline  
        //使用NIO通道                                   
        //保持连接,如果不设置则一次通讯后自动断开
        b.group(bossGroup, group).channel(NioServerSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).localAddress(port)
                .handler(new SimpleServerHandler())//服务端handler
                .childHandler(new ChannelInitializer<Channel>() {  //channel使用的handler
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new EchoServerHandler2(), new EchoServerHandler());
                    }
                });
        //Binds server, waits for server to close, and releases resources  
        ChannelFuture f = b.bind().sync();
        System.out.println(EchoServer.class.getName() + "started and listen on �" + f.channel().localAddress());
        f.channel().closeFuture().sync();
    } finally {
        group.shutdownGracefully().sync();
    }
    

    主要处理说明:

    (1)、新建NioEventLoopGroup类型的bossGroup和group。bossGroup主要处理服务端接收客户端连接处理,group主要处理读写等I/O事件及任务等;
    (2)、创建ServerBootstrap,其主要对一些处理进行代理,如bind()等操作,其是其他类的一个简单门面;
    (3)、channel()方法设置服务端的ServerSocketChannel实现类,本处实现类为NioServerSocketChannel。
    (4)、option()方法设置Channel的相关选项,具体查看ChannelOption中的定义;
    (5)、localAddress()设置服务端绑定的本地地址及端口;
    (6)、handler()设置服务端的对应Channel的Handler;
    (7)、childHandler()设置子连接的Channel的Handler;
    (8)、bind()及sync()绑定本地地址并同步返回绑定结果;

    1.2、bind()调用流程

    bind()调用流程图:

    bind()调用流程图.png

    (1)、调用ServerBootstrap.bind():应用调用ServerBootstrap的bind()操作;
    (2)、调用AbstractBootstrap.bind():调用doBind()对进行bind操作;
    (3)、调用AbstractBootstrap.initAndRegister():利用ChannelFactory.newChannel()实例化NioServerSocketChannel;
    (4)、调用ServerBootstrap.init():对NioServerSocketChannel进行初始化,主要操作如设置Channel相关的选项及属性、设置ChannelHandler为ServerBootstrapAcceptor等,ServerBootstrapAcceptor为inbound类型的ChannelHandler,其为ServerBootstrap的内部类,其主要实现ChannelRead()操作,将客户端的连接注册到EventLoopGroup的EventLoop中。
    (5)、调用NioEventLoop.register():将NioServerSocketChannel注册到bossGroup中。
    (6)、调用AbstractBootstrap.doBind0:将实际的bind操作以任务的形式添加到bossGroup的EventLoop中。
    (7)、调用NioServerSocketChannel.bind():在EventLoop中以任务的形式调用此方法进行实际的bind()操作。

    2、主要方法源码分析

    2.1、doBind()源码分析

    doBind()源码:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    主要流程处理:

    • 调用initAndRegister()初始化Channel并将其注册到bossGroup中的NioEventLoop中;
    • 若注册成功,则调用doBind0()进行实际的bind操作;
    • 若还未注册,则创建注册结果的监听器及doBind0()的异步结果,若Channel注册成功,则在结果监听器中进行doBind0()操作,并将bind()异步结果这种为成功;否则将在监听器中设置异步结果为失败;

    2.2、initAndRegister()源码分析

    initAndRegister()源码:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.
    
        return regFuture;
    }
    

    主要处理流程:

    • 通过ChannelFactory新创建一个Channel;
    • 调用ServerBootstrap的init()方法对Channel进行初始化;

    2.3、init()源码分析

    init()源码:

    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
    
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    

    主要处理流程:

    • 如果设置了Channel选项,则调用setChannelOptions()对Channel进行选项设置;
    • 如果设置了属性,则将对应属性设置为Channel的属性;
    • 设置子Channel的选项及属性;
    • 初始化NioServerSocketChannel的ChannelHandler为ServerBootstrapAcceptor,ServerBootstrapAcceptor为inbound类型的ChannelHandler,其主要功能是将已经接受连接的子Channel注册到workerGroup的NioEventLoop中;

    2.4、doBind0()源码分析

    doBind0()源码:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    主要处理流程:

    • 将NioServerSocketChannel.bind()操作封装为任务,并将任务提交给其对应的EventLoop进行处理;

    2.5、ServerBootstrapAcceptor源码分析

    2.5.1、ServerBootstrapAcceptor类继承图:

    ServerBootstrapAcceptor类继承图.png

    ServerBootstrapAcceptor为NioServerSocketChannel的ChannelHandler,其类型为Inbound类型;

    2.5.2、ServerBootstrapAcceptor源码:

    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;
    
        ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
    
            // Task which is scheduled to re-enable auto-read.
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
            // not be able to load the class because of the file limit it already reached.
            //
            // See https://github.com/netty/netty/issues/1328
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
    
            child.pipeline().addLast(childHandler);
    
            setChannelOptions(child, childOptions, logger);
    
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
    
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
    
        private static void forceClose(Channel child, Throwable t) {
            child.unsafe().closeForcibly();
            logger.warn("Failed to register an accepted channel: {}", child, t);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            ctx.fireExceptionCaught(cause);
        }
    }
    

    ServerBootstrapAcceptor主要实现了以下方法:
    (1)、channelRead():设置子连接的ChannelHandler、设置子连接的Channel选项,设置子连接的Channel属性,将子连接注册的child对应的EventLoop中(即workerGroup的EventLoop中);
    (2)、exceptionCaught():若ServerSocketChannel在accept子连接时抛出异常,若ServerSocketChannel的autoRead为true,则设置其为false,即不允许自动接收客户端连接,并延迟1s后再设置其为true,使其允许自动接收客户端连接;

    相关阅读:
    Netty源码愫读(一)ByteBuf相关源码学习 【https://www.jianshu.com/p/016daa404957
    Netty源码愫读(二)Channel相关源码学习【https://www.jianshu.com/p/02eac974258e
    Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【https://www.jianshu.com/p/be82d0fcdbcc
    Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a
    Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.jianshu.com/p/05096995d296

    参考书籍:

    《Netty权威指南》第二版

    相关文章

      网友评论

        本文标题:Netty源码愫读(六)ServerBootstrap相关源码学

        本文链接:https://www.haomeiwen.com/subject/myxwkqtx.html