美文网首页
Netty源码-客户端启动过程

Netty源码-客户端启动过程

作者: persisting_ | 来源:发表于2019-06-09 08:10 被阅读0次

    1 概述

    在介绍了Netty服务端启动之后(参考笔者文章Netty源码-服务端启动过程),再看Netty的客户端启动会发现二者十分类似,服务端启动通过调用了ServerBootstrap.bind方法开启,而客户端启动则通过调用Bootstrap.connect方法启动。

    本文的介绍比较简单,因为许多操作和服务端启动一致,就没有详细介绍,读者可集合服务端启动过程一起理解。

    2 客户端的典型编码

    和介绍服务端一样,我们先看一下客户端的典型编码:

    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                //客户端只要准备一个group即可
                b.group(group)
                //注册客户端使用的channel
                .channel(NioSocketChannel.class)
                //设置客户端channel选项和属性
                .option(ChannelOption.TCP_NODELAY, true)
                .attr(AttributeKey.valueOf("attrKey"), "attrValue")
                //注册客户端pipelne中的handler
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
                //调用connect启动客户端
                ChannelFuture f = b.connect(host, port).sync();
            } finally {
                //优雅停机
                group.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws Exception {
            new TimeClient().connect(8080, "127.0.0.1");
        }
    
    }
    

    3 一些配置函数

    在第1节我们提到了BootstrapServerBootstrap类,Bootstrap主要负责客户端的启动,而ServerBootstrap则主要负责服务端的启动,我们在Netty源码-服务端启动过程第3节一些配置函数介绍了ServerBootstrap的一些常用配置函数,因为ServerBootstrap即需要配置Accept线程和Server channel,又需要配置客户端连接的线程和客户端channel,所以ServerBootstrap的配置方法都是optionchildOptionhandlerchildHandler这样成对出现的,因为Bootstrap主要负责客户端启动,所以只需要配置客户端线程和channel即可,所以其配置方法则没有child*这一类。相关配置方法在文章Netty源码-服务端启动过程也都介绍过,本文也就不再介绍了。

    4 客户端启动

    客户端的启动由Bootstrap.connect方法开启,下面看其源码:

    //Bootstrap
    /**
    * Connect a {@link Channel} to the remote peer.
    */
    public ChannelFuture connect(String inetHost, int inetPort) {
        return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
    }
    
    /**
    * Connect a {@link Channel} to the remote peer.
    */
    public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        //验证一些必要配置是否都已经配置过
        validate();
        //进行地址解析和实际的连接
        return doResolveAndConnect(remoteAddress, config.localAddress());
    }
    
    @Override
    public Bootstrap validate() {
        //父类中的验证主要是保证group和channelFactory不为空
        super.validate();
        //验证设置了handler
        if (config.handler() == null) {
            throw new IllegalStateException("handler not set");
        }
        return this;
    }
    
    //客户端的启动也分为三个步骤,第一为初始化通道,第二为向
    //EventLoopGroup中的某个NioEventLoop持有的Selector注册
    //通道,第三个为解析地址和连接
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //这里完成了第一和第二步骤,即初始化和注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
    
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            //这里完成第三个步骤:解析地址和连接
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    4.1 Channel初始化和注册

    其实客户端Channel初始化和注册实现与服务端基本一样,在初始化时会调用AbstractBootstrap.init方法,这个方法根据在具体的子类中进行了重写,客户端的子类为Bootstrap,其实现如下:

    //Bootstrap
    //逻辑比较简单,首先向客户端channel的pipeline添加handler
    //然后进行选项和attr的设置
    @Override
    @SuppressWarnings("unchecked")
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());
    
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
    

    除此之外,Channel初始化和注册与服务端一样,可参见笔者文章Netty源码-服务端启动过程4.1节相关内容,这里不再介绍。

    doResolveAndConnect0方法主要完成第三个步骤,解析服务器地址并连接:

    //Bootstrap
    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            //首先进行域名解析(如果指定的服务端地址时域名而不是
            //IP时需要进行解析)
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    
            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                //解析成功之后进行连接操作
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }
    
            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();
    
                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }
    
            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }
    

    4.2 地址解析和连接

    在完成第一和第二个步骤之后,通道初始化和注册都已经完成,Bootstrap就会调用doResolveAndConnect0方法解析服务器地址并连接。

    4.2.1 地址解析

    因为我们在指定服务端地址不仅可以使用IP,还可以使用域名,所以我们在指定域名时,就需要Netty将其解析为IP地址,实现逻辑也比较简单,默认的解析器为DefaultNameResolver,根据域名解析出IP调用的方法为java.net.InetAddress.getAllByName(hostname),这里也不再展开介绍。

    4.2.2 连接

    在将域名(如果配置的服务端地址为域名而不是IP时会进行解析操作)解析为IP之后,会调用doConnect进行连接操作:

    //Bootstrap
    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    //直接调用通道的connect方法
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }
    

    从上面的源码可以看出,连接操作直接通过调用Channel.connect方法完成,Channel.connect方法我们直接看起子类AbstractChannel中的实现:

    //AbstractChannel
    //connect都是直接通过调用Pipeline的connect进行连接操作
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }
    
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }
    

    根据Netty源码-ChannelPipeline和ChannelHandler中的介绍,connect方法属于Outbound事件,所以最终会调用HeadContext.connect方法:

    //HeadContext
     @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    

    HeadContext.connect方法调用了Unsafe.connect方法,我们看其在子类AbstractNioUnsafe中的实现:

    //AbstractNioUnsafe
    @Override
    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        try {
            if (connectPromise != null) {
                // Already a connect in process.
                throw new ConnectionPendingException();
            }
    
            boolean wasActive = isActive();
            //这里调用了外部类AbstractNioChannel.doConnect
            //方法执行实际的连接动作
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);
            } else {
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;
    
                // Schedule connect timeout.
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                            ConnectTimeoutException cause =
                                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
                            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                close(voidPromise());
                            }
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }
    
                promise.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isCancelled()) {
                            if (connectTimeoutFuture != null) {
                                connectTimeoutFuture.cancel(false);
                            }
                            connectPromise = null;
                            close(voidPromise());
                        }
                    }
                });
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }
    
    

    因为这里介绍的是客户端的启动,所以我们看AbstractNioChannel.doConnect在其子类NioSocketChannel中的实现:

    //NioSocketChannel
     @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        //如何本地地址不为空,表示要进行本地地址绑定
        if (localAddress != null) {
            doBind0(localAddress);
        }
    
        boolean success = false;
        try {
            //最终调用了java channel.connect方法
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }
    
    //本地地址绑定,调用java channel的bind方法进行绑定
     private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);
        }
    }
    

    相关文章

      网友评论

          本文标题:Netty源码-客户端启动过程

          本文链接:https://www.haomeiwen.com/subject/dswxxctx.html