美文网首页大数据程序员我爱编程
Netty源码分析系列1:BootStrap的分析

Netty源码分析系列1:BootStrap的分析

作者: maskwang520 | 来源:发表于2018-05-13 21:04 被阅读215次

    Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.利用BootStrap我们可以实现创建channel,把channel注册在EventLoop上,发起连接等功能.
    BootStrap的类结构如下:


    image.png
    1. Client端启动实例

    下面是个简单的客户端实例,我们用这个来分析BootStrap的整个流程.

    public class Client {
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
    
            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new MyProtocolEncoder());
                                ch.pipeline().addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(HOST, PORT).sync();
                future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
    }
    
    2. group()
    public B group(EventLoopGroup group) {
            if (group == null) {
                throw new NullPointerException("group");
            }
            if (this.group != null) {
                throw new IllegalStateException("group set already");
            }
            this.group = group;  //设置成员变量group 为传入的group
            return self();
        }
    
    • 这里设置EventLoopGroup,是为了以后注册和handle事件做准备,EventLoopGroup可以理解成一个线程池.在后面注册和handler事件的时候,会从EventLoopGroup取线程处理.
    3. channel()
    public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    
    • 这里并不是返回channel,而是返回一个channelFactory,利用工厂方法构造channel.而下面这个则是一个channelFactory,他是根据传入的Class,通过反射构造channel.
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Class<? extends T> clazz;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("clazz");
            }
            this.clazz = clazz;
        }
     //通过返回获取channel实例
        @Override
        public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    }
    
    4. option()
    public <T> B option(ChannelOption<T> option, T value) {
            if (option == null) {
                throw new NullPointerException("option");
            }
            if (value == null) {   //value为null,则表示删除这个option
                synchronized (options) {
                    options.remove(option);
                }
            } else {
                synchronized (options) {
                    options.put(option, value);
                }
            }
            return self();
        }
    
    • 为Channel设置一些可选的性质.当value为null的时候表示删除这个option.
    5. handler()
    public B handler(ChannelHandler handler) {
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
            return self();
        }
    
    • 设置handler,这里handler是用户自定义处理连接逻辑.例如编码器或者自定义的handler.通常来说我们通过ChannelInitializerinit来添加handler.
    6. connect()
     public ChannelFuture connect(String inetHost, int inetPort) {
            return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
        }
    
    
     public ChannelFuture connect(SocketAddress remoteAddress) {
            if (remoteAddress == null) {
                throw new NullPointerException("remoteAddress");
            }
            //检验各个part是否准备好
            validate();
            return doResolveAndConnect(remoteAddress, config.localAddress());
        }
    
    • 先验证各个part是否准备好,然后再发起连接.
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister(); //1
            final Channel channel = regFuture.channel();  //获取channel
    
            if (regFuture.isDone()) { //异步的结果返回
                if (!regFuture.isSuccess()) { //不成功
                    return regFuture;
                }
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                //异步结果还没出来,添加监听器来监听
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
    
                        Throwable cause = future.cause(); //异步结果
                        if (cause != null) {
                           //注册失败
                            promise.setFailure(cause);
                        } else {
                       // 注册成功了.
                            promise.registered();
                      // 发起连接
                            doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
    
    • 上面是整个注册,连接的逻辑.下面这部分单独把注册部分拿出来.
    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();  //创建实例
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    channel.unsafe().closeForcibly();
                     //如果到这里还没注册channel,则强制使用GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                //如果到这里还没注册channel,则强制使用GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            //在这里异步注册Channel
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close(); //已经注册成功了
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    
    • 上面是整个注册的逻辑,采用是异步的策略,也就是说我们可以在程序中,根据监听器的结果来判断注册是否成功.
    void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            p.addLast(config.handler());
    
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
            }
        }
    
    • 在这里初始化channel.并向channelPipeline中添加handler.为channel设置option和Attribute
    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                                   final SocketAddress localAddress, final ChannelPromise promise) {
            try {
              //获取到该channel绑定的EventLoop
                final EventLoop eventLoop = channel.eventLoop(); 
                final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    
                if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                  //已经解析了,或者没有办法解析.
                    doConnect(remoteAddress, localAddress, promise);
                    return promise;
                }
    
                final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
                if (resolveFuture.isDone()) { //返回异步解析的结果
                    final Throwable resolveFailureCause = resolveFuture.cause();
    
                    if (resolveFailureCause != null) {
                        // 不能立即解析
                        channel.close();
                        promise.setFailure(resolveFailureCause);
                    } else {
                      // 成功解析,则连接
                        doConnect(resolveFuture.getNow(), localAddress, promise);
                    }
                    return promise;
                }
    
                // 没有立刻解析,则添加监听器等待解析的结果
                resolveFuture.addListener(new FutureListener<SocketAddress>() {
                    @Override
                    public void operationComplete(Future<SocketAddress> future) throws Exception {
                        if (future.cause() != null) {
                        //解析失败
                            channel.close();
                            promise.setFailure(future.cause());
                        } else {
                          // 解析成功,发起连接.
                            doConnect(future.getNow(), localAddress, promise);
                        }
                    }
                });
            } catch (Throwable cause) {
                promise.tryFailure(cause);
            }
            return promise;
        }
    
    • 以上是异步解析地址.
     private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //本地地址
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    
    
    • 上面这部分是真正的异步连接服务器.
    7. 总结

    通过上面的叙述,我们不难看出来,BootStrap所做的3件事.无非在这过程中,多次利用异步来获取结果.

    • 创建channel,并初始化
    • 注册channel
    • 连接到服务器

    相关文章

      网友评论

        本文标题:Netty源码分析系列1:BootStrap的分析

        本文链接:https://www.haomeiwen.com/subject/rknldftx.html