美文网首页Java学习笔记Dubbo 源码学习Dubbo
Dubbo 服务调用 源码学习(下)(七)

Dubbo 服务调用 源码学习(下)(七)

作者: jwfy | 来源:发表于2018-05-16 00:22 被阅读37次

    笔记简述
    本学习笔记接上篇Dubbo 服务调用 源码(上)学习(六),上一篇已经完成了invoker的生成,接下来就是具体的方法调用了,包含了mock测试、负载均衡(不涉及细节)、重试、netty调用、以及最后的结果等待和超时检测等几个步骤,依次操作,完成远程请求并获取结果的全过程操作。
    更多内容可看[目录]Dubbo 源码学习

    目录

    Dubbo 服务调用 源码(下)学习(七)
    1、InvokerInvocationHandler 入口
    2、MockClusterInvoker mock入口
    3、AbstractClusterInvoker 负载均衡
    4、FailoverClusterInvoker 重试机制
    5、DubboInvoker invoke
    6、NettyChannel netty请求
    7、Future 结果处理 & 超时检测

    根据动态代理的认识,最后反射执行的方法肯定到InvokerInvocationHandler类的invoke方法中

    1、InvokerInvocationHandler 入口

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
          // 获取方法的名称以及方法的参数信息
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 此时invoker是MockClusterInvoker
        // 还拼接生成了一个RpcInvocation
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
    

    来到了MockClusterInvoker类,此时需要注意到MockClusterInvoker类的invoke是FailoverClusterInvoker
    FailoverClusterInvoker类可以进行重试操作,如果有印象的可以知道在一个reference的xml配置中,可以加上重试次数retries属性字段的值,默认是3次,如果设置了小于0的数字,则为1次,重试次数0位的意思就是只进行一次操作

    2、MockClusterInvoker mock入口

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        
        // 注册中心,服务提供方、服务调用方的信息都存储在directory中,后期均衡负责也是处理这里面的数据
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        // 就是查看 `methodName.mock`或者`mock`的属性值,默认是“false”
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            // 不需要走Mock测试,进入到FailoverClusterInvoker中
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    

    3、AbstractClusterInvoker 负载均衡

    进入到FailoverClusterInvoker类之前先进入到AbstractClusterInvoker类中

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWheatherDestoried();
        LoadBalance loadbalance;
        
        List<Invoker<T>> invokers = list(invocation);
        // 筛选出合适的invokers列表,基于方法和路由信息
        // 其中路由信息则是通过MockInvokersSelector类处理获取到invocation中attachments保存的mock信息去筛选合适的invoker,所以重点是筛选
        // 调试发现,一般情况下在这里面attachments字段并没有数据
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        // 创建合适的均衡负责类loanbalance信息,一般情况是RandomLoadBalance类
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 确保幂等,如果是异步则需要往attachment参数中添加自增ID(这个自增ID是AtomicLong类,线程安全)
        // 这里就有往invocation的attachment填充数据的操作
        return doInvoke(invocation, invokers, loadbalance);
        // 现在进入到FailoverClusterInvoker类中了
    }
    

    4、FailoverClusterInvoker 重试机制

    上面已经说了,这个类的主要作用是重试操作

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
          //  第一个是执行的参数信息,包含了函数名等信息
          // 第二个是被调用执行的参数,包含了服务提供方的IP:PORT信息
          // 第三个是均衡负责,在选择调用的服务方时,会根据该对象选择一个合适的服务方
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        // 检测invokers是否存在,如果不存在则提示没有可用的服务提供方被使用,请检查服务提供方是否被注册
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        // 获取重试的次数,如果设置的值<=0,则只有1次操作机会,默认是3次
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //重试时,进行重新选择,避免重试时invoker列表已发生变化.
            //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
            if (i > 0) {
                checkWheatherDestoried();
                copyinvokers = list(invocation);
                //重新检查一下
                // 注意一下这个list操作,这个list操作是重新更新可用的invoker列表
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            // 选择合适的服务提供方的invoker,在AbstractClusterInvoker类中去完成均衡负责的选择操作
            // 关于均衡负责,后面考虑分为一篇笔记去学习几种不同的负载方法,其中还包含了sticky 粘性连接
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
                // 这一步才是真正的执行调用远程方法的开始&入口
                if (le != null && logger.isWarnEnabled()) {
                    // 存在重试了3次才终于成功的情况,这时候会告警提醒之前存在的错误信息输出
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                // 遇到了RPCEXCEPTION 而且是biz类型的,则不重试直接抛出该异常
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        // 重试多次依旧没有正常的结果返回,则抛出该异常
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }
    

    5、DubboInvoker invoke

    上面说的Result result = invoker.invoke(invocation);,经过层层转发,来到了FutureFilter类

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
        // 看是否为异步的方法,添加有sync字段信息
        // 先从invocation的attachment中查看是否存在async字段,再看看url中的methodName.async ,再看看url的async属性
        
        fireInvokeCallback(invoker, invocation);
        Result result = invoker.invoke(invocation);
        // 进一步invoke操作
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        return result;
    }
    

    来到了MonitorFilter过滤器查看是否需要进行监控(通过查看url是否存在monitor字段,如果为true,则是需要监控)

    再来到了AbstractInvoker类的invoke方法,本身是DubboInvoker

    public Result invoke(Invocation inv) throws RpcException {
        if(destroyed) {
            throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 
                                            + " use dubbo version " + Version.getVersion()
                                            + " is DESTROYED, can not be invoked any more!");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
            // 添加attachment信息,调试中发现添加的是interface和token
        }
        Map<String, String> context = RpcContext.getContext().getAttachments();
        // 这个是利用了ThreadLocal持有的数据中获取
        if (context != null) {
          // 这代码写的冗余了,而且为啥不再加个empty的检测呢?
            invocation.addAttachmentsIfAbsent(context);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            // 如果是异步的方法,添加async字段,
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 如果是异步则添加自增ID
        
        try {
            return doInvoke(invocation);
            // 进入到DubboInvoker执行invoke操作了
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return new RpcResult(te);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }
    

    DubboInvoker 类

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        // 添加路径和版本概念,如果没有添加则是0.0.0 
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        // currentClient 是 后续需要连接netty操作的客户端
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // 是否为异步操作。。。。为啥确认个异步操作这么多重复操作
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 是否设置了return=false 这个操作
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            // 超时设置的时间,默认为1s
            if (isOneway) {
                // 如果强制设置了return=false,异步的future都不需要设置了,也不需要关注超时字段
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                // 调用的是request方法,异步的设置future
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                // 同步方法,设置超时时间,等待返回
                // 其实也是异步方法,只是最后调用了get去获取future的结果
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    6、NettyChannel netty请求

    上述的request以及send方法,都被转发到HeaderExchangeChannel类中,这个类有一个非常关键的字段是channel,是NettyClient类,包含了服务提供方的IP:PORT信息

    其实仔细看request方法和send方法最后的实现差不太多,只是request需要检测连接的channel是否存在,而send单独本身是不需要进行这个操作的。

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        // 生成的req有个线程安全的自增的ID,可以通过这个统计出调用的次数
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
            // 进入到NettyChannel类中
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
        // 返回future,后续的超时就是通过对future操作
    }
    

    NettyChannel 类

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        
        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            // 这个就是调用的netty的write操作完成数据发送操作
            // 这个就是经过层层嵌套包装向外发送数据的最终操作
            if (sent) {
               // url配置的send字段属性,如果为true
               // 则通过await等待超时的世界去查看请求是否成功
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            // 抛出远程发送消息失败的错误,打印出发送参数以及远程IP
        }
        
        if(! success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    

    7、Future 结果处理 & 超时检测

    看看异步拿到结果,判断是否超时等检测操作

    DefaultFuture 类

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            // 这个时候还是异步执行的,会立即执行到这里(时间非常的端,相比RPC的几百毫秒而言)
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                     // 时刻观察是否拿到response
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        // 如果拿到结果或者超时了,跳出循环
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                // 这个时候还没拿到结果,肯定是认为超时了,抛出TimeoutException
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
    
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
             // 拿到的结果是无效的
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            // 这才是真的调用成功,返回数据了
            return res.getResult();
        }
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            // 客户端超时或者服务端超时,抛出TimeoutException
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        // 其他的就抛出RemotingException异常,并从res获取错误原因
        throw new RemotingException(channel, res.getErrorMessage());
    }
    

    至此整个的远程调用就全部结束了

    相关文章

      网友评论

        本文标题:Dubbo 服务调用 源码学习(下)(七)

        本文链接:https://www.haomeiwen.com/subject/jgsudftx.html