美文网首页
【dubbo源码】21. 服务消费方:netty服务端创建和启动

【dubbo源码】21. 服务消费方:netty服务端创建和启动

作者: 天还下着毛毛雨 | 来源:发表于2021-07-26 21:11 被阅读0次

前言 :

如果你配置的协议是dubbo协议(默认),那么服务与服务之间进行通信是用netty走tcp协议的。

那么服务消费方就会在服务启动的时候用netty启动tcp长连接,为后续向服务方发送远程调用和接受返回值做准备。

netty服务端的创建和启动

在前面讲到,@Reference注解会注入一个远程服务代理实例,其实就是创建一个DubboInvoker对象,然后对他进行层层的包装,比如ClusterInvoker集群容错,还有各种invokerFliter过滤器。在最里面的DubboInvoker的创建过程中,就会初始化netty服务端,并发起连接。

image

根据url调到当前类的getClients()

private ExchangeClient[] getClients(URLurl) {
    // whether to share connection
    boolean service_share_connect = false;
    //获取connections配置参数,配置客户端服务端建立几个长连接
    int connections =url.getParameter(Constants.CONNECTIONSKEY, 0);
    // if not configured, connection isshared, otherwise, one connection forone service
    // 如果都没配置,那么就是一个远程服务创建一个连接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }
    ExchangeClient[] clients = newExchangeClient[connections];
    for (int i = 0; i < clients.length;i++) {
        //如没有配置,就是建立一个长连接
        if (service_share_connect) {
            clients[i] =getSharedClient(url);
        } else {
            // 初始化连接
            clients[i] = initClient(url);
        }
    }
    return clients;
}

关注initClient(url),用Exchangers的静态方法connect返回一个ExchangeClient对象

private ExchangeClient initClient(URL url) {

    // client type setting.
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // connection should be lazy
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            //如果非懒加载  requestHandler是最后调用的handler
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;

Exchangers.connect(url, requestHandler)中的第二个参数dubboProtocol的类成员变量,传进去,主要是他的reply,是用来处理rpc响应的。

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    @Override
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
     if (message instanceof Invocation) {
         Invocation inv = (Invocation) message;
         Invoker<?> invoker = getInvoker(channel, inv);
         // need to consider backward-compatibility if it's a callback
         if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
             String methodsStr = invoker.getUrl().getParameters().get("methods");
             boolean hasMethod = false;
             if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                 hasMethod = inv.getMethodName().equals(methodsStr);
             } else {
                 String[] methods = methodsStr.split(",");
                 for (String method : methods) {
                     if (inv.getMethodName().equals(method)) {
                         hasMethod = true;
                         break;
                     }
                 }
             }
             if (!hasMethod) {
                 logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                         + " not found in callback service interface ,invoke will be ignored."
                         + " please update the api interface. url is:"
                         + invoker.getUrl()) + " ,invocation is :" + inv);
                 return null;
             }
         }
         RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
         return invoker.invoke(inv);
     }
     throw new RemotingException(channel, "Unsupported request: "
             + (message == null ? null : (message.getClass().getName() + ": " + message))
             + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
     if (message instanceof Invocation) {
         reply((ExchangeChannel) channel, message);
     } else {
         super.received(channel, message);
     }
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
     invoke(channel, Constants.ON_CONNECT_KEY);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
     if (logger.isInfoEnabled()) {
         logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
     }
     invoke(channel, Constants.ON_DISCONNECT_KEY);
    }

    private void invoke(Channel channel, String methodKey) {
     Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
     if (invocation != null) {
         try {
             received(channel, invocation);
         } catch (Throwable t) {
             logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
         }
     }
    }

    private Invocation createInvocation(Channel channel, URL url, String methodKey) {
     String method = url.getParameter(methodKey);
     if (method == null || method.length() == 0) {
         return null;
     }
     RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
     invocation.setAttachment(Constants.PATH_KEY, url.getPath());
     invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
     invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
     invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
     if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
         invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
     }
     return invocation;
    }

这里根据spi工厂,获取到的HeaderExchanger对象
然后调他的connection方法

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw newIllegalArgumentException("url ==null");
    }
    if (handler == null) {
        throw newIllegalArgumentException("handler== null");
    }
    url =url.addParameterIfAbsent(Constants.COEC_KEY, "exchange");
    //获取ExchangeHandler对象
    return getExchanger(url).connect(url,handler);
}

 public static Exchanger getExchanger(URL url) {
    String type =url.getParameter(Constants.EXCHANGER_EY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

// 这里根据spi工厂,获取到的HeaderExchanger对象
public static ExchangergetExchanger(String type) {
   return   ExtensionLoader.getExtensionLoader(Exhanger.class).getExtension(type);
}

这个类的connect是用于服务调用的netty服务端,bind是用于暴露服务用的netty服务端,这里看connection

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        //这里建立了一个很长的链式调用的对象关系,是责任链的变种
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

这里的connect方法就会有返回值了,最外面的是HeaderExchangeClient对象,构造方法里又有Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))

看下Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))返回的是啥

 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw newIllegalArgumentException("url ==null");
    }
    ChannelHandler handler;
    if (handlers == null ||handlers.length == 0) {
        handler = newChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = newChannelHandlerDispatcher(handlers;
    }
    //默认创建的还是netty的客户端nettyClient
    return getTransporter().connect(url,handler);
}

public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

根据spi工厂返回的是Transporter实例NettyTransporter对象,调他的connect方法,可以看到返回的是一个NettyClient对象,入参则是new ChannelHandlerDispatcher( decodeHanler ( requetHandler ) ),所以这里最终返回的是 nettyClient(url, new channelHandlerDispatcher( decodeHanler ( requetHandler ) ))

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

NettyClient的构造方法中又会对自己持有的ChannelHandler对象进行包装,

NettyClient类的构造方法

public NettyClient(final URL url, finalChannelHandler handler) throwsRemotingException {
    //wrapChannelHandler是核心逻辑
    super(url, wrapChannelHandler(url,handler));
}

看下wrapChannelHandler(url, handler)返回的是啥

protected static ChannelHandlerwrapChannelHandler(URL url, ChannelHandlerhandler) {
    url = ExecutorUtil.setThreadName(url,CLIENT_THREAD_POOL_NAME);
    url =url.addParameterIfAbsent(Constants.THEADPOOL_KEY,Constants.DEFAULT_CLIENT_THREADPOOL);
    //这里又对ChannelHandler进行了一系列装
    return ChannelHandlers.wrap(handler,url);
}

ChannelHandlers.wrap(ChannelHandler handler, URL url) 先获取实例,然后调wrapInternal方法

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    //对ChannelHandler进行包装,dispatcher默认是AllDispatcher
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

所以最终NettyClient构造函数中,调用父类的构造函数传入的ChannelHandler对象是一个很长的链条了。

nettyClient(url,MultiMessageHandler(HeartbeatHandler(AllDispatcher(channelHandlerDispatcher( decodeHanler ( requetHandler ))))))

netty服务端的启动

NettyClient父类AbstractClient构造方法
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);

    try {
        //开启netty的客户端
        doOpen();
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    try {
        // connect.
        //连接到netty服务端
        connect();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (Exeption t) {
        //省略异常捕获代码
    }
    // 省略一部分代码
}
NettyClient.doOpen()

先是调用doOpen()n方法,会钩到子类的doOpen,开启netty的客户端

@Override
protected void doOpen() throws Throwable {
    //核心handler,持有nettyClient实例
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(NioSocketChannel.class);

    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("handler", nettyClientHandler);
        }
    });
}

创建了netty中的Bootstrap对象,并且最重要的是创建了NettyClientHandler对象,他持有了当前对象NettyClient实例,并注册到了netty的pipeline中

NettyClientHandler实现ChannelInboundHandler接口和ChannelOutboundHandler接口,那么通信的过程中,对channel中进行读写都会调用到NettyClientHandler对应实现这两个接口的方法中

image
NettyClient.doConnect(),发起连接

父类AbstractClient调用子类的doOpen方法后,建立了netty服务端,又会掉自身的connect()

protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) {
            return;
        }
        // 这里会开启一个tcp断线重现的定时器,判断是否有netty中的channel对象,或者channel已经关闭
        initConnectStatusCheckCommand();
        // 勾到子类
        doConnect();
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        reconnect_count.set(0);
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        connectLock.unlock();
    }
}

NettyClient.doConnect()
用netty的bootstrap.connection,返回一个ChannelFuture对象,阻塞住,直到获取到连接结果,判断是否之前存在channel,有就关闭。最终把这次连接获取到的channel对象赋值给当前对象的channel对象中。

持有channel肯定是为了用netty的channle进行读写操作

@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 发起连接
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 阻塞,直到获取到连接结果
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

        // 如果连接成功
        if (ret && future.isSuccess()) {
            Channel newChannel = future.channel();
            try {
                // Close old channel
                // 看看之前是否有channel对象,有的话就关闭掉
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    //连接成功后赋值给自己的channel变量
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.cause() != null) {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
        } else {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        if (!isConnected()) {
            //future.cancel(true);
        }
    }
}

到这里,用户服务调用的netty服务端已经启动并且连接成功,获取到了channel,可以进行tcp通信了。

回顾下整个创建过程,会发现其实是一个很长的链条

image

从NettyClient开始都被包装到了NettyClientHandler, 并且塞到了netty pipeline的handler链中,那么与netty通信的过程中,就会先调到NettyClientHandler的读写响应方法,然后进行dubbo的业务逻辑:比如心跳,编解码,批量响应等

总结下前面的,一个远程服务实例就会创建多个Invoker对象,每个Invoker对象 基于同一个url创建一个或者多个(取决于配置)netty长连接服务端。

相关文章

网友评论

      本文标题:【dubbo源码】21. 服务消费方:netty服务端创建和启动

      本文链接:https://www.haomeiwen.com/subject/yengmltx.html