美文网首页
8.Dubbo远程调用(来自官网)

8.Dubbo远程调用(来自官网)

作者: 方雲 | 来源:发表于2021-04-20 00:05 被阅读0次

    8.1 简介

    Dubbo服务调用过程较为复杂,包含众多步骤,如发送请求、编解码、服务降级、过滤器链处理、序列化、线程派发以及响应请求等。

    8.1 源码分析

    源码分析前,我们先通过一张图了解Dubbo服务调用过程:


    调用过程

    首先消费者通过代理对象Proxy发起远程调用,接着通过网络客户端Client将编码后的请求发送给服务提供方的网络层,即Server。Server在收到请求后,首先对数据包进行解码,然后将解码后的请求发送给分发起Dispatcher,再由分发起将请求派发到指定的线程池,最后由线程池调用具体的服务。

    8.1.1 服务调用方式

    Dubbo支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。所谓“无返回值”异步调用是指服务消费方只管调用,不关心调用结果,此时Dubbo会直接返回一个空的RpcResult。若要使用异步特性,需要服务消费方手动进行配置。
    下面我们将使用Dubbo官方提供的Demo分析整个调用过程,我们从DemoService接口的代理类开始进行分析。Dubbo默认使用Javassist为服务接口生成动态代理类,因此我们需要现将代理类反编译才能看到代码:

    /**
     * Arthas 反编译步骤:
     * 1. 启动 Arthas
     *    java -jar arthas-boot.jar
     *
     * 2. 输入编号选择进程
     *    Arthas 启动后,会打印 Java 应用进程列表,如下:
     *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
     *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
     *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
     *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
     *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
     * 这里输入编号 3,让 Arthas 关联到启动类为 com.....Consumer 的 Java 进程上
     *
     * 3. 由于 Demo 项目中只有一个服务接口,因此此接口的代理类类名为 proxy0,此时使用 sc 命令搜索这个类名。
     *    $ sc *.proxy0
     *    com.alibaba.dubbo.common.bytecode.proxy0
     *
     * 4. 使用 jad 命令反编译 com.alibaba.dubbo.common.bytecode.proxy0
     *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
     *
     * 更多使用方法请参考 Arthas 官方文档:
     *   https://alibaba.github.io/arthas/quick-start.html
     */
    public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
        // 方法数组
        public static Method[] methods;
        private InvocationHandler handler;
    
        public proxy0(InvocationHandler invocationHandler) {
            this.handler = invocationHandler;
        }
    
        public proxy0() {
        }
    
        public String sayHello(String string) {
            // 将参数存储到 Object 数组中
            Object[] arrobject = new Object[]{string};
            // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
            Object object = this.handler.invoke(this, methods[0], arrobject);
            // 返回调用结果
            return (String)object;
        }
    
        /** 回声测试方法 */
        public Object $echo(Object object) {
            Object[] arrobject = new Object[]{object};
            Object object2 = this.handler.invoke(this, methods[1], arrobject);
            return object2;
        }
    }
    

    如上,代理类的逻辑比较简单,首先将运行时参数存储到数组中,然后调用InvocationHandler接口实现类的invoke方法,得到调用结果,最后将结果强制类型转换并返回。接下来我们看InvocationHandler的源码:

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            
            // 如果是Object 类中的方法(未被子类重写),比如 wait/notify,直接调用
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            
            // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            
            // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    

    此处要注意InvokerInvocationHandler中的成员变量invoker在集群中实际类型为MockClusterInvoker(具体代码在服务引用那一章,ReferenceConfig的createProxy方法,对于多个提供者的时候使用SPI包装扩展MockClusterWrapper创建invoker),而MockClusterInvoker内部就封装了服务降级逻辑,MockClusterInvoker是对FailoverClusterInvoker的一层包装,具体这些会在下一篇集群相关章节中分析。这里先直接分析DubboInvoker这种直连方式的实现。

    ublic abstract class AbstractInvoker<T> implements Invoker<T> {
        
        public Result invoke(Invocation inv) throws RpcException {
            if (destroyed.get()) {
                throw new RpcException("Rpc invoker for service ...");
            }
            RpcInvocation invocation = (RpcInvocation) inv;
            // 设置 Invoker
            invocation.setInvoker(this);
            if (attachment != null && attachment.size() > 0) {
                // 设置 attachment
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
                invocation.addAttachments(contextAttachments);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                // 设置异步信息到 RpcInvocation#attachment 中
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            try {
                // 抽象方法,由子类实现
                return doInvoke(invocation);
            } catch (InvocationTargetException e) {
                // ...
            } catch (RpcException e) {
                // ...
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }
    
        protected abstract Result doInvoke(Invocation invocation) throws Throwable;
        
        // 省略其他方法
    }
    

    上面的代码来自AbstractInvoker类,其中大部分代码用于添加信息到RpcInvocation#attachment变量中,添加完毕后,调用doInvoke执行后续的调用,这是一个抽象方法,直连由DubboInvoker实现:

    public class DubboInvoker<T> extends AbstractInvoker<T> {
        
        private final ExchangeClient[] clients;
        
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            // 设置 path 和 version 到 attachment 中
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                // 从 clients 数组中获取 ExchangeClient
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                // 获取异步配置
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                // isOneway 为 true,表示“单向”通信
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    
                // 异步无返回值
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    // 发送请求
                    currentClient.send(inv, isSent);
                    // 设置上下文中的 future 字段为 null
                    RpcContext.getContext().setFuture(null);
                    // 返回一个空的 RpcResult
                    return new RpcResult();
                } 
    
                // 异步有返回值
                else if (isAsync) {
                    // 发送请求,并得到一个 ResponseFuture 实例
                    ResponseFuture future = currentClient.request(inv, timeout);
                    // 设置 future 到上下文中
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    // 暂时返回一个空结果
                    return new RpcResult();
                } 
    
                // 同步调用
                else {
                    RpcContext.getContext().setFuture(null);
                    // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(..., "Invoke remote method timeout....");
            } catch (RemotingException e) {
                throw new RpcException(..., "Failed to invoke remote method: ...");
            }
        }
        
        // 省略其他方法
    }
    

    上面的代码包含了Dubbo对同步和异步调用的处理逻辑,搞懂了上面的代码,会对Dubbo的同步和异步调用方式又更深入的了解。Dubbo实现同步和异步调用比较关键的一点是在于由谁来调用ResponseFuture的get方法:同步模式下,由框架自身调用ResponseFuture的get方法。异步调用模式下,则由用户调用该方法。ResponseFuture是一个接口,我们来看一下它的默认实现类DefaultFuture:

    ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。ResponseFuture 是一个接口,下面我们来看一下它的默认实现类 DefaultFuture 的源码。
    
    public class DefaultFuture implements ResponseFuture {
        
        private static final Map<Long, Channel> CHANNELS = 
            new ConcurrentHashMap<Long, Channel>();
    
        private static final Map<Long, DefaultFuture> FUTURES = 
            new ConcurrentHashMap<Long, DefaultFuture>();
        
        private final long id;
        private final Channel channel;
        private final Request request;
        private final int timeout;
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
        private volatile Response response;
        
        public DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            
            // 获取请求 id,这个 id 很重要,后面还会见到
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 存储 <requestId, DefaultFuture> 映射关系到 FUTURES 中
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }
        
        @Override
        public Object get() throws RemotingException {
            return get(timeout);
        }
    
        @Override
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            
            // 检测服务提供方是否成功返回了调用结果
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    // 循环检测服务提供方是否成功返回了调用结果
                    while (!isDone()) {
                        // 如果调用结果尚未返回,这里等待一段时间
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                
                // 如果调用结果仍未返回,则抛出超时异常
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            
            // 返回调用结果
            return returnFromResponse();
        }
        
        @Override
        public boolean isDone() {
            // 通过检测 response 字段为空与否,判断是否收到了调用结果
            return response != null;
        }
        
        private Object returnFromResponse() throws RemotingException {
            Response res = response;
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            
            // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果
            if (res.getStatus() == Response.OK) {
                return res.getResult();
            }
            
            // 抛出异常
            if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
            }
            throw new RemotingException(channel, res.getErrorMessage());
        }
        
        // 省略其他方法
    }
    

    如上,当消费者还没接受到调用结果是,用户线程调用get方法会被阻塞住。同步调用模式下,框架获得DefaultFuture对象后,会理解调用get方法进行等待。而异步模式下则是将该对象封装到FutureAdapter实例中,并将FutureAdapter实例设置到RpcContext中,供用户使用。FutureAdapter是一个适配器,用于将Dubbo的ResposneFuture与jdk的Future进行适配,当用户线程调用Future的get方法,经过FutureAdapter适配,最终会调用ResponseFuture实现类对象的get方法,也就是DefaultFuture的get方法。

    目前最新的代码不再使用ResposneFuture,而是使用jdk后来提供的CompletableFuture。

    8.2 服务消费方发送请求

    8.2.1 发送请求

    先看同步模式下,服务消费方是如何发送调用请求的:


    上图展示了服务消费方发送请求过程的部分调用栈,图中可以看出经过多次调用后,才将请求数据送至NettyNioClientSocketChannel,这样做的原因是通过Exchange层为框架引入Request和Response语义。我们先来分析ReferenceCountExchangeClient的源码:

    final class ReferenceCountExchangeClient implements ExchangeClient {
    
        private final URL url;
        private final AtomicInteger referenceCount = new AtomicInteger(0);
    
        public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
            this.client = client;
            // 引用计数自增
            referenceCount.incrementAndGet();
            this.url = client.getUrl();
            
            // ...
        }
    
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request);
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request, timeout);
        }
    
        /** 引用计数自增,该方法由外部调用 */
        public void incrementAndGetCount() {
            // referenceCount 自增
            referenceCount.incrementAndGet();
        }
        
            @Override
        public void close(int timeout) {
            // referenceCount 自减
            if (referenceCount.decrementAndGet() <= 0) {
                if (timeout == 0) {
                    client.close();
                } else {
                    client.close(timeout);
                }
                client = replaceWithLazyClient();
            }
        }
        
        // 省略部分方法
    }
    

    ReferenceCountExchangeClient内部定义了一个引用技术变量referenceCount,每当其持有的client对象被引用一次都会进行自增。每当close方法被调用时,进行自减。这个类只是实现了一个引用技术的功能,其他方法均直接调用被装饰对象的方法,所以我们继续分析HeaderExchangeClient这个类:

    ReferenceCountExchangeClient 的源码。
    
    final class ReferenceCountExchangeClient implements ExchangeClient {
    
        private final URL url;
        private final AtomicInteger referenceCount = new AtomicInteger(0);
    
        public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
            this.client = client;
            // 引用计数自增
            referenceCount.incrementAndGet();
            this.url = client.getUrl();
            
            // ...
        }
    
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request);
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request, timeout);
        }
    
        /** 引用计数自增,该方法由外部调用 */
        public void incrementAndGetCount() {
            // referenceCount 自增
            referenceCount.incrementAndGet();
        }
        
            @Override
        public void close(int timeout) {
            // referenceCount 自减
            if (referenceCount.decrementAndGet() <= 0) {
                if (timeout == 0) {
                    client.close();
                } else {
                    client.close(timeout);
                }
                client = replaceWithLazyClient();
            }
        }
        
        // 省略部分方法
    }
    ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。所以这里就不多说了,继续向下分析,这次是 HeaderExchangeClient。
    
    public class HeaderExchangeClient implements ExchangeClient {
    
        private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
        private final Client client;
        private final ExchangeChannel channel;
        private ScheduledFuture<?> heartbeatTimer;
        private int heartbeat;
        private int heartbeatTimeout;
    
        public HeaderExchangeClient(Client client, boolean needHeartbeat) {
            if (client == null) {
                throw new IllegalArgumentException("client == null");
            }
            this.client = client;
            
            // 创建 HeaderExchangeChannel 对象
            this.channel = new HeaderExchangeChannel(client);
            
            // 以下代码均与心跳检测逻辑有关
            String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
            this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
            this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            if (needHeartbeat) {
                // 开启心跳检测定时器
                startHeartbeatTimer();
            }
        }
    
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            // 直接 HeaderExchangeChannel 对象的同签名方法
            return channel.request(request);
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            // 直接 HeaderExchangeChannel 对象的同签名方法
            return channel.request(request, timeout);
        }
    
        @Override
        public void close() {
            doClose();
            channel.close();
        }
        
        private void doClose() {
            // 停止心跳检测定时器
            stopHeartbeatTimer();
        }
    
        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }
    
        private void stopHeartbeatTimer() {
            if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
                try {
                    heartbeatTimer.cancel(true);
                    scheduled.purge();
                } catch (Throwable e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
            heartbeatTimer = null;
        }
        
        // 省略部分方法
    }
    

    HeaderExchangeClient很多方法都只有一行代码,即直接调用HeaderExchangeChannel 对象的方法。那么HeaderExchangeClient的用处是什么呢?其实只是封装了一些关于心跳检测的逻辑,所以我们还要进一步分析HeaderExchangeChannel的实现:

    HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。
    
    final class HeaderExchangeChannel implements ExchangeChannel {
        
        private final Channel channel;
        
        HeaderExchangeChannel(Channel channel) {
            if (channel == null) {
                throw new IllegalArgumentException("channel == null");
            }
            
            // 这里的 channel 指向的是 NettyClient
            this.channel = channel;
        }
        
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(..., "Failed to send request ...);
            }
            // 创建 Request 对象
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            // 设置双向通信标志为 true
            req.setTwoWay(true);
            // 这里的 request 变量类型为 RpcInvocation
            req.setData(request);
                                            
            // 创建 DefaultFuture 对象
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 调用 NettyClient 的 send 方法发送请求
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            // 返回 DefaultFuture 对象
            return future;
        }
    }
    

    到这里,我们终于看到了Request语义,方法中首先创建一个Request对象,把RpcInvocation和一些其他信息放进Request中,然后将该对象传递给NettyClient的send方法,进行后续的调用。NettyClient的send方法实现直接继承自AbstractPeer类:

    w DefaultFuture(channel, req, timeout);
            try {
                // 调用 NettyClient 的 send 方法发送请求
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            // 返回 DefaultFuture 对象
            return future;
        }
    }
    到这里大家终于看到了 Request 语义了,上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient 的 send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。
    
    public abstract class AbstractPeer implements Endpoint, ChannelHandler {
        
        @Override
        public void send(Object message) throws RemotingException {
            // 该方法由子类AbstractClient 类实现
            send(message, url.getParameter(Constants.SENT_KEY, false));
        }
        
        // 省略其他方法
    }
    
    public abstract class AbstractClient extends AbstractEndpoint implements Client {
        
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (send_reconnect && !isConnected()) {
                connect();
            }
            
            // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
            Channel channel = getChannel();
            if (channel == null || !channel.isConnected()) {
                throw new RemotingException(this, "message can not send ...");
            }
            
            // 继续向下调用
            channel.send(message, sent);
        }
        
        protected abstract Channel getChannel();
        
        // 省略其他方法
    }
    

    注意此处继承关系较多,NettyClient继承了AbstractClient,AbstractClient继承了AbstractEndpoint,AbstractEndpoint继承了AbstractPeer。
    默认情况下,Dubbo使用Netty作为底层的通信框架,因此下面我们到NettyClient子类看一下getChannel方法的实现:

    public class NettyClient extends AbstractClient {
        
        // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
        private volatile Channel channel;
    
        @Override
        protected com.alibaba.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isConnected())
                return null;
            // 获取一个 NettyChannel 类型对象
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    }
    
    final class NettyChannel extends AbstractChannel {
    
        private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
            new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
    
        private final org.jboss.netty.channel.Channel channel;
        
        /** 私有构造方法 */
        private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
            super(url, handler);
            if (channel == null) {
                throw new IllegalArgumentException("netty channel == null;");
            }
            this.channel = channel;
        }
    
        static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
            if (ch == null) {
                return null;
            }
            
            // 尝试从集合中获取 NettyChannel 实例
            NettyChannel ret = channelMap.get(ch);
            if (ret == null) {
                // 如果 ret = null,则创建一个新的 NettyChannel 实例
                NettyChannel nc = new NettyChannel(ch, url, handler);
                if (ch.isConnected()) {
                    // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                    ret = channelMap.putIfAbsent(ch, nc);
                }
                if (ret == null) {
                    ret = nc;
                }
            }
            return ret;
        }
    }
    

    获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannel 的 send 方法。

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
            // 发送消息(包含请求和响应消息)
            ChannelFuture future = channel.write(message);
            
            // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
            //   1. true: 等待消息发出,消息发送失败将抛出异常
            //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
            // 默认情况下 sent = false;
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                // 等待消息发出,若在规定时间没能发出,success 会被置为 false
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    
        // 若 success 为 false,这里抛出异常
        if (!success) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    }
    

    经历多次调用,到这里请求数据的发送过程就结束了,过程漫长。为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。

    proxy0#sayHello(String)
      —> InvokerInvocationHandler#invoke(Object, Method, Object[])
        —> MockClusterInvoker#invoke(Invocation)
          —> AbstractClusterInvoker#invoke(Invocation)
            —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
              —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
                —> ListenerInvokerWrapper#invoke(Invocation) 
                  —> AbstractInvoker#invoke(Invocation) 
                    —> DubboInvoker#doInvoke(Invocation)
                      —> ReferenceCountExchangeClient#request(Object, int)
                        —> HeaderExchangeClient#request(Object, int)
                          —> HeaderExchangeChannel#request(Object, int)
                            —> AbstractPeer#send(Object)
                              —> AbstractClient#send(Object, boolean)
                                —> NettyChannel#send(Object, boolean)
                                  —> NioClientSocketChannel#write(Object)
    

    8.2.2 请求编码

    了解netty的同学,很自然就想到,在前面invoker的调用到达NettyClient之后,对与编解码的处理肯定会通过NettyClient将负责编解码的ChannelHandler添加到netty的pipeline中,具体逻辑在NettyClient的doOpen方法:


    image.png

    如上,借助NettyCodecAdapter将Codec2接口借助SPI的方式加载编解码实现类构造出相对应的编解码ChannelHanlder。
    具体编解码的实现以及Dubbo提供的SPI接口ChannelHandler和Dubbo实现netty提供的出站入站ChannelHandler是怎么关联的,可以参考上一篇的讲解。

    8.3 服务提供方接收请求

    8.3.1 请求解码

    通过netty的channelHandler找到Codec2接口的实现类进行解码,具体实现参考上一篇。

    8.3.2 调用服务

    解码器将数据包解析成Request对象后,NettyHandler的messageReceived方法接收到这个对象,会继续向下传递。通过SPI的包装扩展,一步步传递给NettyServer、MultiMessagehandler、HeartbeatHandler以及AllChannelHandler。最终由AllChannelHandler将该对象分装到Runnable实现类对象中,并将Runnable放入线程池中执行后续的调用逻辑,整个调用栈如下:

    NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
      —> AbstractPeer#received(Channel, Object)
        —> MultiMessageHandler#received(Channel, Object)
          —> HeartbeatHandler#received(Channel, Object)
            —> AllChannelHandler#received(Channel, Object)
              —> ExecutorService#execute(Runnable)    // 由线程池执行后续的调用逻辑
    

    我们直接来看一下最后一个处理器AllChannelHandler的逻辑,它是一个将所有消息都派发到业务线程池去执行的策略:

    public class AllChannelHandler extends WrappedChannelHandler {
    
        public AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    
        /** 处理连接事件 */
        @Override
        public void connected(Channel channel) throws RemotingException {
            // 获取线程池
            ExecutorService cexecutor = getExecutorService();
            try {
                // 将连接事件派发到线程池中处理
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException(..., " error when process connected event .", t);
            }
        }
    
        /** 处理断开事件 */
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException(..., "error when process disconnected event .", t);
            }
        }
    
        /** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                // 将请求和响应消息派发到线程池中处理
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted 
                    // 错误信息封装到 Response 中,并返回给服务消费方。
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                            + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        // 返回包含错误信息的 Response 对象
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(..., " error when process received event .", t);
            }
        }
    
        /** 处理异常信息 */
        @Override
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
            } catch (Throwable t) {
                throw new ExecutionException(..., "error when process caught event ...");
            }
        }
    }
    

    如上,请求对象会被封装到ChannelEventRunnable中,该类将会是服务调用过程的新起点,所以接下来我们以它为起点向下探索:

    public class ChannelEventRunnable implements Runnable {
        
        private final ChannelHandler handler;
        private final Channel channel;
        private final ChannelState state;
        private final Throwable exception;
        private final Object message;
        
        @Override
        public void run() {
            // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
            if (state == ChannelState.RECEIVED) {
                try {
                    // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
                    // 注意这个handler是AllChannelHandler传过来的,
                    // AllChannelHandler本身是handlerWrapper,通过把其包装的handler传递给ChannelEventRunnable,
                    // 使其能够在线程池中继续handler(还有多层包装)的处理
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ... message is ...");
                }
            } 
            
            // 其他消息类型通过 switch 进行处理
            else {
                switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("... operation error, channel is ...");
                    }
                    break;
                case DISCONNECTED:
                    // ...
                case SENT:
                    // ...
                case CAUGHT:
                    // ...
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
                }
            }
    
        }
    }
    

    接下来,AllChannelHandler给ChannelEventRunnable传递的handler是DecodeHandler:

    DecodeHandler extends AbstractChannelHandlerDelegate {
    
        public DecodeHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                // 对 Decodeable 接口实现类对象进行解码
                decode(message);
            }
    
            if (message instanceof Request) {
                // 对 Request 的 data 字段进行解码
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                // 对 Request 的 result 字段进行解码
                decode(((Response) message).getResult());
            }
    
            // 执行后续逻辑
            handler.received(channel, message);
        }
    
        private void decode(Object message) {
            // Decodeable 接口目前有两个实现类,
            // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
            if (message != null && message instanceof Decodeable) {
                try {
                    // 执行解码逻辑
                    ((Decodeable) message).decode();
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                    }
                }
            }
        }
    }
    

    DecodeHandler对消息进行解码,然后继续调用其包装的下一个handler的received方法,即HeaderExchangeHandler:

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
        private final ExchangeHandler handler;
    
        public HeaderExchangeHandler(ExchangeHandler handler) {
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            this.handler = handler;
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                // 处理请求对象
                if (message instanceof Request) {
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        // 处理事件
                        handlerEvent(channel, request);
                    } 
                    // 处理普通的请求
                    else {
                        // 双向通信
                        if (request.isTwoWay()) {
                            // 向后调用服务,并得到调用结果
                            Response response = handleRequest(exchangeChannel, request);
                            // 将调用结果返回给服务消费端
                            channel.send(response);
                        } 
                        // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
                        else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                }      
                // 处理响应对象,服务消费方会执行此处逻辑,后面分析
                else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    // telnet 相关,忽略
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    
        Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
            if (req.isBroken()) {
                Object data = req.getData();
    
                String msg;
                if (data == null)
                    msg = null;
                else if
                    (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                else
                    msg = data.toString();
                res.setErrorMessage("Fail to decode request due to: " + msg);
                // 设置 BAD_REQUEST 状态
                res.setStatus(Response.BAD_REQUEST);
    
                return res;
            }
            
            // 获取 data 字段值,也就是 RpcInvocation 对象
            Object msg = req.getData();
            try {
                // 继续向下调用
                Object result = handler.reply(channel, msg);
                // 设置 OK 状态码
                res.setStatus(Response.OK);
                // 设置调用结果
                res.setResult(result);
            } catch (Throwable e) {
                // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
            }
            return res;
        }
    }
    

    到这里,我们看到了比较清晰的请求和响应逻辑,对于双向通信,HeaderExchangeHandler首先向后调用,得到调用结果,然后将调用结果封装到Response对象,最后返回给服务消费方。如果请求不合法或者调用失败,则将错误信息封装到Response对象,并返回给服务消费方。接下来,我们把剩余的调用过程分析完,下一个handler是DubboProtocol中的匿名类ExchangeHandlerAdapter(这里有点糊涂,匿名类不可能通过spi包装,难道整个包装是通过dispatcher=all配置项手动包装的?)

    ExchangeHandlerAdapter() {
    
            @Override
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    // 获取 Invoker 实例
                    Invoker<?> invoker = getInvoker(channel, inv);
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        // 回调相关,忽略
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    // 通过 Invoker 调用具体的服务
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: ...");
            }
            
            // 忽略其他方法
        }
        
        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            // 忽略回调和本地存根相关逻辑
            // ...
            
            int port = channel.getLocalAddress().getPort();
            
            // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
            //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
            String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    
            // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
            // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
            if (exporter == null)
                throw new RemotingException(channel, "Not found exported service ...");
    
            // 获取 Invoker 对象,并返回
            return exporter.getInvoker();
        }
        
        // 忽略其他方法
    }
    

    以上逻辑用于获取与指定服务对应的 Invoker 实例,并通过 Invoker 的 invoke 方法调用服务逻辑。invoke 方法定义在 AbstractProxyInvoker 中,代码如下:

    public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
    
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            try {
                // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并
                return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
            } catch (InvocationTargetException e) {
                return new RpcResult(e.getTargetException());
            } catch (Throwable e) {
                throw new RpcException("Failed to invoke remote proxy method ...");
            }
        }
        
        protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
    }
    

    如上,doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下:

    public class JavassistProxyFactory extends AbstractProxyFactory {
        
        // 省略其他方法
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            // 创建匿名类对象
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    // 调用 invokeMethod 方法进行后续的调用
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    }
    

    Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

    /** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
    public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
        public static String[] pns;
        public static Map pts;
        public static String[] mns;
        public static String[] dmns;
        public static Class[] mts0;
    
        // 省略其他方法
    
        public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
            DemoService demoService;
            try {
                // 类型转换
                demoService = (DemoService)object;
            }
            catch (Throwable throwable) {
                throw new IllegalArgumentException(throwable);
            }
            try {
                // 根据方法名调用指定的方法
                if ("sayHello".equals(string) && arrclass.length == 1) {
                    return demoService.sayHello((String)arrobject[0]);
                }
            }
            catch (Throwable throwable) {
                throw new InvocationTargetException(throwable);
            }
            throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
        }
    }
    

    到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:

    ChannelEventRunnable#run()
      —> DecodeHandler#received(Channel, Object)
        —> HeaderExchangeHandler#received(Channel, Object)
          —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
            —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
              —> Filter#invoke(Invoker, Invocation)
                —> AbstractProxyInvoker#invoke(Invocation)
                  —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                    —> DemoServiceImpl#sayHello(String)
    

    剩余的将调用结果封装后,编码发给消费方的过程就不再累赘了。
    消费方接收到响应后,也是类似的过程。响应数据解码完成后,Dubbo会将响应对象派发到线程池,要注意的是线程池中的线程并不是用户的调用线程(应该是客户端的业务线程池),所以要想办法将响应对象从线程池线程传递到用户线程上。还记得我们前面分析的用户线程在发送我请求后,会调用DefaultFuture的get方法等待对象的到来。当响应对象到来后,用户线程被唤醒,并通过调用编号获取属于自己的响应对象,具体实现如下:

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
       
       @Override
       public void received(Channel channel, Object message) throws RemotingException {
           channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
           ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
           try {
               if (message instanceof Request) {
                   // 处理请求,前面已分析过,省略
               } else if (message instanceof Response) {
                   // 处理响应
                   handleResponse(channel, (Response) message);
               } else if (message instanceof String) {
                   // telnet 相关,忽略
               } else {
                   handler.received(exchangeChannel, message);
               }
           } finally {
               HeaderExchangeChannel.removeChannelIfDisconnected(channel);
           }
       }
    
       static void handleResponse(Channel channel, Response response) throws RemotingException {
           if (response != null && !response.isHeartbeat()) {
               // 继续向下调用
               DefaultFuture.received(channel, response);
           }
       }
    }
    
    public class DefaultFuture implements ResponseFuture {  
       
       private final Lock lock = new ReentrantLock();
       private final Condition done = lock.newCondition();
       private volatile Response response;
       
       public static void received(Channel channel, Response response) {
           try {
               // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
               DefaultFuture future = FUTURES.remove(response.getId());
               if (future != null) {
                   // 继续向下调用
                   future.doReceived(response);
               } else {
                   logger.warn("The timeout response finally returned at ...");
               }
           } finally {
               CHANNELS.remove(response.getId());
           }
       }
    
       private void doReceived(Response res) {
           lock.lock();
           try {
               // 保存响应对象
               response = res;
               if (done != null) {
                   // 唤醒用户线程
                   done.signal();
               }
           } finally {
               lock.unlock();
           }
           if (callback != null) {
               invokeCallback(callback);
           }
       }
    }
    

    以上逻辑是将响应对象保存到相应的 DefaultFuture 实例中,然后再唤醒用户线程,随后用户线程即可从 DefaultFuture 实例中获取到相应结果。

    相关文章

      网友评论

          本文标题:8.Dubbo远程调用(来自官网)

          本文链接:https://www.haomeiwen.com/subject/vrmilltx.html