美文网首页
9.dubbo源码-调用服务

9.dubbo源码-调用服务

作者: 阿飞的博客 | 来源:发表于2017-11-07 18:53 被阅读443次

    1、RPC调用

    dubbo服务调用只需在spring.xml中如下配置后,就可以调用本地方法一样,调用provider提供的远程服务:

    <dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" />

    dubbo服务调用链路图

    dubbo服务调用前部分链路如下图所示,下面根据这张图以调用com.alibaba.dubbo.demo.DemoService.sayHello("afei")为例,一步一步分析dubbo服务的调用过程:

    dubbo服务调用-1.png

    InvokerInvocationHandler

    demoService.sayHello("afei") 这样的RPC调用,被Dubbo代理后,就会调用InvokerInvocationHandler中的invoke()方法。源码如下:

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
        
        public InvokerInvocationHandler(Invoker<?> handler){
            this.invoker = handler;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 得到调用的方法名称
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            // 调用toString()方法的特殊处理方式
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            // 调用hashCode()方法的特殊处理方式
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            // 调用equals()方法的特殊处理方式
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            // 常规的dubbo调用,都走这里,把调用的方法名称和参数封装成RpcInvocation对象,然后调用MockClusterInvoker中的invoke()方法
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    
    // RpcInvocation的定义如下,包含了一些RPC调用信息:方法名,参数类型,参数值,Dubbo调用的一些附属信息attachments,以及调用的Invoker(attachments和invoker在后面会赋值)
    public class RpcInvocation implements Invocation, Serializable {
        private static final long serialVersionUID = -4355285085441097045L;
        private String               methodName;
        private Class<?>[]           parameterTypes;
        private Object[]             arguments;
        private Map<String, String>  attachments;
        private transient Invoker<?> invoker;
        ... ...
    }
    

    invoke()方法中Object proxy就是代理的对象;Method method就是本次调用的方法,即DemoService中的sayHello(String)方法;Object[] args就是调用的参数,即"afei",组装成Object[]就是new Object[]{"afei"}。

    MockClusterInvoker

    接下来分析MockClusterInvoker中的invoke()方法。部分核心源码如下:

    public class MockClusterInvoker<T> implements Invoker<T>{
    
        ... ...
        
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
            // 获取mock的值,默认为false;
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();         
            if (value.length() == 0 || value.equalsIgnoreCase("false")){
                // 如果在<dubbo:reference>中没有申明mock(默认方式),或者申明为false,那么走这里的逻辑
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                // 强制mock调用方式的WARN日志
                if (logger.isWarnEnabled()) {
                    logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else{
                //fail-mock
                try {
                    // 普通的mock方式,例如申明mock="com.alibaba.dubbo.demo.consumer.mock.DemoServiceMock",那么在RPC调用抛出RPC异常时才启用mock调用;
                    result = this.invoker.invoke(invocation);
                }catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                        }
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            ... ...
        }
    }
    

    mock申明方式:<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" version="1.0.0" mock="false"/>,从这段源码可知,dubbo提供了三种策略:
    1、不需要mock,直接调用AbstractClusterInvoker(默认方式)
    2、强制mock方式调用;
    3、先AbstractClusterInvoker方式调用,如果有RpcException(比如没有任何可用的Provider),再以mock方式调用;

    想要详细了解mock的使用方式,请参考dubbo一些你不一定知道但是很好用的功能中的"本地伪装"

    AbstractClusterInvoker

    接下来调用AbstractClusterInvoker中的invoke()方法,部分源码如下所示:

    public Result invoke(final Invocation invocation) throws RpcException {
    
        checkWheatherDestoried();
    
        LoadBalance loadbalance;
        // 从Diectory中得到所有可用的,经过路由过滤的Invoker集合
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            // 如果有可用的Invoker,那么根据第一个Invoker得到其LoadBalance策略
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            // // 如果没有可用的Invoker,那么采用默认的LoadBalance策略(随机策略)
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        // 如果异步调用,那么在attachment中给id赋值(值是自增的,通过AtomicLong.getAndIncrement()得到)
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // doInvoke()定义在AbstractClusterInvoker中是一个抽象方法,所以这里采用了模板方法设计模式,调用FailoverClusterInvoker(默认是failover集群容错)中的doInvoke()方法
        return doInvoke(invocation, invokers, loadbalance);
    }
    
    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    

    FailoverClusterInvoker

    FailoverClusterInvoker的分析,请参考dubbo源码-集群容错,这篇文章对dubbo支持的所有的集群容错处理都一一进行了比较详细的分析;但是不管哪种集群容错处理,接下来都会调用invoker.invoke(invocation)得到Result;RPC调用invoker.invoke(invocation);的调用关系链图如下,根据这张图一步一步分析每个步骤:

    dubbo服务调用-2.png

    InvokerWrapper

    InvokerWrapper中会初始化Consumer端的调用过滤链,然后在FailoverClusterInvoker中调用invoker.invoke(invocation)时一一执行每个Filter:

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 得到Consumer端的Filter集合
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    ... ...
                    // 通过Filter的next()方法遍历执行Filter链上所有的Filter
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    ... ...
                };
            }
        }
        return last;
    }
    

    Filter

    Consumer端Filter有ConsumerContextFilter、FutureFilter、MonitorFilter等;这里不一一讲解,里面的业务都比较简单;执行完Filter链后,调用AbstractInvoker中的invoke()方法;

    AbstractInvoker

    AbstractInvoker中申明了抽象方法:protected abstract Result doInvoke(Invocation invocation) throws Throwable;,所以,这里会以模板方法设计模式调用DubboInvoker中的doInvoker()方法;接下来的调用关系链如下图所示:

    dubbo服务调用-3.png

    DubboInvoker

    DubboInvoker.doInvoke(Invocation)核心源码如下:

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // RpcInvocation中attachments设置path和version并赋值
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        // 如果只有一个Client,直接选择;如果多个Client,轮询
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 是否异步调用,默认false
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // 是否单边调用,即不需要等待返回结果,默认false
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 获取Consumer侧的timeout,默认1s
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                ... ...
            } else if (isAsync) {
                ... ...
            } else {
                // 重点关注这里,即默认实现
                RpcContext.getContext().setFuture(null);
                // 发送请求后,调用DefaultFuture.get()方法获取远程响应的结果
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    说明:currentClient.request(inv, timeout)得到的是ResponseFuture类型结果,调用get()返回Result对象;

    HeaderExchangeChannel

    HeaderExchangeChannel.request(Object request, int timeout)核心源码如下:

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        ... ...
        // 构造一个准备RPC远程调用的Request类型参数
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        // 将调用该方法前的RpcInvocation类型请求参数封装到Request中
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    

    DefaultFuture future = new DefaultFuture(channel, req, timeout);源码解读:

    这里比较重要,包含了通过netty调用后,如何拿到调用结果。

    public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        // request.getId即得到这一次请求的id,id生成方式通过AtomicLong.getAndIncrement()得到;源码参考Request.newId();这个方法会不会溢出?getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID,所以不会溢出;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        // dubbo会根据请求ID从这个map中就能拿到对应的响应结果
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    

    说明:由于请求ID是从AtomicLong取得,所以理论上是唯一的;即使当达到AtomicLong的最大值后又从MIN_VALUE开始,理论上同一个ID对应的请求不可能存在这么长时间从而导致下一次轮回ID碰撞;

    AtomicLong溢出问题可以通过下面一段代码验证:

    /**
     * @author afei
     */
    public class AtomicLongTest {
        public static void main(String[] args) {
            AtomicLong al = new AtomicLong(Long.MAX_VALUE-2);
            for (int i=0; i<5; i++){
                System.out.println(al.getAndIncrement());
            }
        }
    }
    
    运行结果如下,达到MAX_VALUE后下一次getAndIncrement()就是MIN_VALUE,所以getAndIncrement()不会溢出:
    9223372036854775805
    9223372036854775806
    9223372036854775807
    -9223372036854775808
    -9223372036854775807
    

    AbstractClient

    Channel准备发送请求消息到远程服务的核心源码:

    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()){
            connect();
        }
        Channel channel = getChannel();
        //TODO getChannel返回的状态是否包含null需要改进
        if (channel == null || ! channel.isConnected()) {
          throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        channel.send(message, sent);
    }
    

    NettyChannel

    NettyChannel.send(Object message, boolean sent)是真正调用Netty把请求消息通过NIO方式发给远程服务的地方,message即dubbo封装的Request类型请求参数,核心属性是mData,为RpcInvocation类型,源码如下:

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        
        boolean success = true;
        int timeout = 0;
        try {
            // 这里就是调用netty的NioClientSocketChannel.write(Object message)方法将请求message发送到Provider
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } 
        ... ...
    }
    

    2. 获取结果

    通过Netty以NIO方式发送请求后,接下来分析dubbo如果拿到Provider响应的结果,并把结果和请求对应起来(因为是异步调用,不能把结果和请求对应关系搞混淆);由前面分析HeaderExchangeChannel可知,dubbo调用(Result) currentClient.request(inv, timeout).get(),通过ResponseFuture.get()方法得到RpcResult结果,ResponseFuture的实现就是DefaultFuture

    DefaultFuture

    DefaultFuture中get()方法的核心源码:

    public Object get(int timeout) throws RemotingException {
        // 如果Consumer端指定的timeout不大于0,那么设置为默认值1s
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // isDone()就是判断 response != null
        if (! isDone()) {
            long start = System.currentTimeMillis();
            // 通过ReentrantLock锁保证线程安全,lock定义为:private final Lock lock = new ReentrantLock();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
    
    private Object returnFromResponse() throws RemotingException {
        // 全局申明的private volatile Response response就是结果,后面会分析response是怎么被赋值的;
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        // 如果是正常的结果,直接返回
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        // 如果是超时的结果,那么抛出超时异常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    

    HeaderExchangeHandler

    发送RPC请求后,在HeaderExchangeHandler.received()中接收Porvider返回的响应结果(通过dubbo源码-NettyClient分析可知,NettyHandler是消息的handler,NettyHandler中的messageReceived()即消息接收方法,经过解码后,最终调用的就是HeaderExchangeHandler.received()),源码如下:

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                ... ...
            } else if (message instanceof Response) {
                // 这里处理响应结果
                handleResponse(channel, (Response) message);
            } 
            ... ...
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    
    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
    

    DefaultFure.received()方法源码:

    public static void received(Channel channel, Response response) {
        try {
            // 在response中封装了请求ID,根据请求ID得到DefaultFuture(根据请求id通过remove方式获取DefaultFuture的好处是,获取的同时也清理了FUTURES中这个ID对应的请求信息,防止FUTURES堆积)
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                // 接收Reponse结果,这就是请求id对应的结果
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    
    
    private void doReceived(Response res) {
        lock.lock();
        try {
            // 将reponse赋值给申明的:private volatile Response response;这就是请求id对应的结果
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
        
    

    3、超时请求清理

    对于那些耗时超过Consumer端timeout指定的值,且没有任何响应,dubbo如何处理呢?这些请求如果不处理的话,数据一致会积压在FUTURES这个Map中,dubbo采用的方法是在DefaultFuture中开启一个后台线程,死循环检测,源码如下:

    private static class RemotingInvocationTimeoutScan implements Runnable {
    
        public void run() {
            while (true) {
                try {
                    // 只要有请求,那么FUTURES就不为空,那么遍历这些请求
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        // 如果耗时超过了Consumer端指定的timeout,那么返回特定status值的Response(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT),Consumer拿到这种Response后,判断它是Response.SERVER_TIMEOUT or Response.CLIENT_TIMEOUT,从而抛出TimeoutException异常;
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }
    
    static {
        // 静态代码块,即初始化创建名为"DubboResponseTimeoutScanTimer"的线程来获取调用超时的请求,并返回特定status的Response
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }
    

    相关文章

      网友评论

          本文标题:9.dubbo源码-调用服务

          本文链接:https://www.haomeiwen.com/subject/lukjmxtx.html