美文网首页
Dubbo 服务引用过程(二)

Dubbo 服务引用过程(二)

作者: 此鱼不得水 | 来源:发表于2017-12-19 19:55 被阅读165次

    在上一章中讲到了dubbo引用过程的第一步,在zk上注册和订阅。本章将接着上一章继续讲解之后的服务引用的步骤:

    记得在上一章中我们看到了这段代码:

        if (enabled) {
            invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
        }
    
        //InvokerDelegete中包装的Invoker实际上是DubboInvoker
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            // create rpc invoker,这里的url对应的地址和接口否是provider信息,parameters是providerUrl和consumerUrl的汇总信息
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
        
        /**
         * 此代码负责NIO框架Client创建及初始化,和提供者初始化相似,消费者的
         * 初始化也基本上围绕着Client、Handler、Channel对象的创建与不断装饰
         * 的过程,不同的是消费者底层是与提供者Server建立连接,此过程在remote
         * 层下完成,remote层可分为消息交换层与网路传输层即Exchanger与
         * Transporter层,还有,我们在说提供者初始化时,说过同个JVM中相同协议
         * 的服务共享一个Server,同样在消费者初始化时,引用同一个提供者的所有
         * 服务可以共享一个Client进行通信,这也就实现了Server-Client在同一个
         * 通道中进行通信,实现长连接的高效通信,但是在服务请求数据量比较大时
         * 或请求数比较多时,可以设置每服务每连接或每服务多连接可以提高通信效
         * 率,具体是通过消费者方connections=2设置连接数。所有消费者端Client
         * 有两种,一种是共享型Client,一种是创建型Client,当然共享型Client
         * 属于创建型Client一部分,下面具体说说这两种Client创建的细节,也是服
         * 务引用的重要细节
         */
        private ExchangeClient[] getClients(URL url){
            //是否共享连接
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            //如果connections不配置,则共享连接,否则每服务每连接
            if (connections == 0){
                service_share_connect = true;
                connections = 1;
            }
            //有多少个连接就创建多少个Client
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect){ //如果是共享连接的话(客户端只创建一个Client)
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url); //这个初始化Client很重要
                }
            }
            return clients;
        }
        
        /**
         * 获取共享连接
         * 此为创建共享型Client,共享型Client是指消费者引用同一
         * 提供者的服务时,使用同一个Client来提高通信效率
         */
        private ExchangeClient getSharedClient(URL url){
            String key = url.getAddress(); //消费者的地址
            ReferenceCountExchangeClient client = referenceClientMap.get(key);
            if ( client != null ){
                if ( !client.isClosed()){
                    //如果这个共享client不等于null并且这个client没有关闭的话
                    //将client内部的引用计数加1
                    client.incrementAndGetCount();
                    return client;
                } else {
                    //虽然这个client不为空,client确实关闭状态的
                    //于是要移除这个被关闭的client
                    referenceClientMap.remove(key);
                }
            }
            //这个client内部已经建立了与服务端的特定端口号的连接
            ExchangeClient exchagneclient = initClient(url);
            //将client设置为幽灵Client,作用目前还不明确
            client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            return client; 
        }
        
        /**
         * 创建客户端新连接.
         */
        private ExchangeClient initClient(URL url) {
            //Client类型设置
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
            String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
            boolean compatible = (version != null && version.startsWith("1.0."));
            url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
            //默认开启heartbeat
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            // BIO存在严重性能问题,暂时不允许使用
            if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
            ExchangeClient client ;
            try {
                //延迟加载
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
                    client = new LazyConnectExchangeClient(url ,requestHandler);
                } else {
                    //在这里我们又看到这个Exchangers的门面了,这个要十分注意,因为在这之前讲解服务启动的时候有讲过这个,这个跟上次说的门面十分类似。
                    client = Exchangers.connect(url ,requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url
                        + "): " + e.getMessage(), e);
            }
            return client;
        }
    

    上面提到过在包装的过程中,会初始化与服务端的连接,这一步是在NettyClient中完成的,实际上client = Exchangers.connect(url ,requestHandler);最后返回的实际类型也就是nettyClient,在NettyClient的构造函数中直接完成了与provider链接的建立,具体如下:

        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
            //wrapChannelHandler(url, handler)操作即返回了后面提到的MultiMessageHandler
            super(url, wrapChannelHandler(url, handler));
        }
        
        //super(url, wrapChannelHandler(url, handler))
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            //见面我简化了代码,只写了核心的逻辑
            super(url, handler);
            doOpen();
            connect();
        }
        
        //doOpen和connect都是调用子类NettyClient的方法
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            bootstrap = new ClientBootstrap(channelFactory);
            // 设置全局的链接配置
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
            bootstrap.setOption("keepAlive", true);
            bootstrap.setOption("tcpNoDelay", true);
            bootstrap.setOption("connectTimeoutMillis", getTimeout());
            //nettyHandler持有了NettyClient,NettyClient持有了上面说的多层Handler,最终还是委托到持有的Handler来处理
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
        }
        
        
        // 初始化或者重新覆盖channel
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            //直接建立与provider的链接,建立链接之后就可以调用channel.write发送信息到provider
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try{
                //链接建立的超时时间默认为3s
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
                
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.getChannel();
                    newChannel.setInterestOps(Channel.OP_READ_WRITE);
                    try {
                        // 关闭旧的连接,因为一个NettyClient就对应一个链接,所以当创建新的链接的时候就代表要移除旧的链接
                        Channel oldChannel = NettyClient.this.channel; // copy reference
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    //完成链接替换
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.getCause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            }finally{
                if (! isConnected()) {
                    future.cancel();
                }
            }
        }
    

    上面的过程就是建立链接的过程了,但是在这个过程中有个重要的地方就是多层Handler的包装,因为装饰者模式的层级比较多,所以我们还是采用一贯的措施:层层分解。
    包装的层级依次是:

    • requestHandler
    • HeaderExchangeHandler 包装了requestHandler
    • DecodeHandler 包装了HeaderExchangeHandler
    • AllChannelHandler 包装了DecodeHandler
    • HeartbeatHandler 包装了AllChannelHandler
    • MultiMessageHandler 包装了HeartbeatHandler

    最终调用分顺序是又下至上开始调用的。

        //MultiMessageHandler
        public void received(Channel channel, Object message) throws RemotingException {
            //dubbo可以将多个返回信息包装到一个信息里去返回,这时候就就用到MultiMessageHandler了。这么做的目的的是节省传输量
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage)message;
                for(Object obj : list) {
                    handler.received(channel, obj);
                }
            } else {
                handler.received(channel, message);
            }
        }
        
        //HeartbeatHandler,判断收到的信息类型是否是心跳类型,如果是心跳类型的话就不会触发后面的handler,直接在此结束请求。
        public void received(Channel channel, Object message) throws RemotingException {
            setReadTimestamp(channel);
            if (isHeartbeatRequest(message)) {
                Request req = (Request) message;
                // oneWay代表不需要返回数据的请求,twoWay代表需要返回数据的请求
                if (req.isTwoWay()) {
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setEvent(Response.HEARTBEAT_EVENT);
                    channel.send(res);
                    if (logger.isInfoEnabled()) {
                        int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                        if(logger.isDebugEnabled()) {
                            logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                            + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                            + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                        }
                    }
                }
                //直接结束请求
                return;
            }
            //客户端只要知道服务端活这就可以了,不需要其他操作
            if (isHeartbeatResponse(message)) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        new StringBuilder(32)
                            .append("Receive heartbeat response in thread ")
                            .append(Thread.currentThread().getName())
                            .toString());
                }
                return;
            }
            handler.received(channel, message);
        }
        
        //AllChannelHandler就是将所有的逻辑都通过线程池来调度,其余的就是直接调用内部handler的received方法
        
        //DecodeHandler 处理编码和解码的逻辑
        
        //HeaderExchangeHandler 主要处理请求的接收,回调(此时不会再对requestHandler进行调用)
        
        
    

    在这里我们只是粗略的了解到这些Handler的层级以及各自的关系,但其实在发起请求调用的时候首先进行的是Invoker的invoke调用,这些Handler只是处理请求结果的一些过程逻辑,也就是consumer端收到provider的返回结果之后的处理逻辑。
    下面我们要看一下作为consumer是如何把请求发送出去的:
    因为我们看到的接口类其实都是代理,所以真正的请求开始也是从代理开始的。

        //创建代理的时候指定了处理的InvokerInvocationHandler
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
        
        //InvocationHandler的具体实现,在调用toString,hashCode和equals的时候会直接进行调用,不再产生远程的请求
        public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
        
        public InvokerInvocationHandler(Invoker<?> handler){
            this.invoker = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            //invoke返回的结果是Result类型,recreate表示如果有结果的话就返回真正的调用结果,有异常的话就返回异常信息
            //我们在外面看到的Invocation信息都源自这里,主要包含了方法和参数信息
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
        //抛开集群和mock内容来看的话这里的Invoker可以理解为DubboInvoker,所以我们来看看DubboInvoker的invoke操作:
        //首先是进入到AbstractInvoker的invoke操作:
        public Result invoke(Invocation inv) throws RpcException {
            if(destroyed) {
                throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                                + " use dubbo version " + Version.getVersion()
                                                + " is DESTROYED, can not be invoked any more!");
            }
            RpcInvocation invocation = (RpcInvocation) inv;
            //这里的this就是DubboInvoker本身
            invocation.setInvoker(this);
            //设置attachment参数,设置完之后就可以在服务端取到具体的参数值,很方便服务端和客户端进行通信
            if (attachment != null && attachment.size() > 0) {
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> context = RpcContext.getContext().getAttachments();
            if (context != null) {
                invocation.addAttachmentsIfAbsent(context);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            //如果是异步的话需要设置一下InvocationId,方便在回调的时候找到对应的数据
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            
            
            try {
                return doInvoke(invocation);
                //将异常信息包装到RpcResult中,至于怎么处理这些异常信息,要看后面的逻辑了,原则上是RpcResult包含了调用的所有返回信息,包括异常信息
            } catch (InvocationTargetException e) { // biz exception
                Throwable te = e.getTargetException();
                if (te == null) {
                    return new RpcResult(e);
                } else {
                    if (te instanceof RpcException) {
                        ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                    }
                    return new RpcResult(te);
                }
            } catch (RpcException e) {
                if (e.isBiz()) {
                    return new RpcResult(e);
                } else {
                    throw e;
                }
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }
    }
    
        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
            
            ExchangeClient currentClient;
            //同一个客户端只跟一个服务端建立一个长链接,也可能建立多个,如果是多个长链接的话就默认随机取一个
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                //是否是异步
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                //是否是单工
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
                //oneWay形式最简单,发出消息之后就不用管其他的事情了
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                //异步调用的话在发出请求之后后面还要取到结果,后面有机会可以讲一下具体如何取
                //本质上无论同步还是异步,在服务端都是异步的,调用方在调用发出之后就收到ResponseFuture对象,只不过异步的操作方式不会马上调用get方法获取结果,同步的方法会立马调用get获取数据
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout) ;
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {//同步非单工
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            }
            // 捕捉异常的步骤忽略掉
        }
        
        //上面的request就是调用到了HeaderExchangeChannel.request
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            //创建request并设置req的属性,在生成的时候已经在req的内部创建了一个独有的ID
            Request req = new Request();
            req.setVersion("2.0.0");
            req.setTwoWay(true);
            //request=RpcInvocation
            req.setData(request);
            //创建默认的Future
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try{
                //继续调用channel的send,这里的channel其实就是NettyClient了,客户端的调用到这里其实已经结束了。接下来就是等待返回结果
                channel.send(req);
            }catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
        //feature的ID非常重要,和request,response的ID保持一致,方便从返回的结果Response中找到对应的future
        public DefaultFuture(Channel channel, Request request, int timeout){
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 放到全局的future容器中,以后方便根据ID寻找对应的feature
            FUTURES.put(id, this);
            //道理同上
            CHANNELS.put(id, channel);
        }
        
        private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();
    
    

    基本上到这里客户端调用的逻辑都讲到了,涉及到一些细节方面还没有进行梳理,但是大致上我们已经对于服务的调用有了一个轮廓性的认识,后面会逐篇展开其他细节的内容,希望在所有Dubbo内容都解释完的时候能有一个回归性的总结,把一个调用从开始到结束经过的所有过程都讲一下。

    相关文章

      网友评论

          本文标题:Dubbo 服务引用过程(二)

          本文链接:https://www.haomeiwen.com/subject/pjviwxtx.html