美文网首页
dubbo消费端调用

dubbo消费端调用

作者: 剑道_7ffc | 来源:发表于2020-06-14 14:03 被阅读0次

    消费端调用的过程

    调用链

    关键方法

    InvokerInvocationHandler#invoke

    根据服务消费知道service注入的对象InvokerInvocationHandler(MockClusterWrapper(FailoverClusterInvoker(RegistryDirectory))),消费端调用queryPayList实际上调用的是动态生成的queryPayList方法,在生成的queryPayList方法来调用InvokerInvocationHandler的invoke方法。

    --动态生成的字节码
    public java.lang.String queryPayList() {
        Object[] args = new Object[0];
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
    
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
    
    MockClusterInvoker#invoke

    Mock用来服务降级的,若出现非业务异常则调用mock方法。

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
    
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {//没有mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {//业务异常
                    throw e;
                }
                result = doMockInvoke(invocation, e);//mock方法
            }
        }
        return result;
    }
    
    AbstractClusterInvoker#invoke
    public Result invoke(final Invocation invocation) throws RpcException {
        // binding attachments(附件) into invocation,通过附件可以追加一些功能如参数的隐式传递
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        
        //做路由
        List<Invoker<T>> invokers = list(invocation);
        //初始化负载均衡
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
    
    AbstractClusterInvoker#initLoadBalance
    protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
        //通过url的loadbalance来获取,默认是random
        if (CollectionUtils.isNotEmpty(invokers)) {
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
        } else {
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
        }
    }
    
    FailoverClusterInvoker#doInvoke
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        String methodName = RpcUtils.getMethodName(invocation);
        //retries的重试次数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {//开始重试
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                copyInvokers = list(invocation);
            }
            //排除已经选择的或不可用的
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    

    路由

    RouterChain#route

    路由主要是为了筛选出符合规则的服务提供方,主要有条件路由 ConditionRouter、脚本路由 ScriptRouter。可以在读写分离,应用隔离等。

    public List<Invoker<T>> route(URL url, Invocation invocation) {
        --在RegistryDirectory保存的invokers,表示服务端的服务地址
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {//路由规则
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
    }
    

    负载均衡

    通过服务集合中选择合适的服务就是负载均衡。

    AbstractLoadBalance#select

    通过抽象工厂方法来实现负载均衡,默认的RandomLoadBalance

    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }
    

    RandomLoadBalance#doSelect

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the weight of every invokers
        int[] weights = new int[length];
        // the first invoker's weight
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // The sum of weights
        int totalWeight = firstWeight;
        for (int i = 1; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // save for later use
            weights[i] = weight;
            // Sum
            totalWeight += weight;
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
    

    可调用的Invoker初始化过程

    RegistryDirectory

    RegistryDirectory的成员属性private volatile Map<String, Invoker<T>> urlInvokerMap

    RegistryDirectory#toInvokers

    完成对urlInvokerMap的赋值

    for (URL providerUrl : urls) {//dubbo://
        URL url = mergeUrl(providerUrl);
    
        String key = url.toFullString(); // The parameter urls are sorted
        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        newUrlInvokerMap.put(key, invoker);
    }
    
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
    
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
    
        return invoker;
    }
    

    所以invoker是InvokerDelegate(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))

    DubboInvoker

    AbstractInvoker#invoke

    把contextAttachments放入RpcInvocation中

    DubboInvoker#doInvoke

    protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            //interfaceName
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            //版本号
            inv.setAttachment(VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);//超时时间
                if (isOneway) {//判断是否有返回值
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                    responseFuture.whenComplete((obj, t) -> {
                        if (t != null) {
                            asyncRpcResult.completeExceptionally(t);
                        } else {
                            asyncRpcResult.complete((AppResponse) obj);
                        }
                    });
                    RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                    return asyncRpcResult;
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    currentClient.request

    currentClient是ReferenceCountExchangeClient(HeaderExchangeClient())对象,调用链路是ReferenceCountExchangeClient-->HeaderExchangeClient-->HeaderExchangeChannel,最后调用send方法

    public void send(Object message, boolean sent) throws RemotingException {
      
        Request request = new Request();
        request.setVersion(Version.getProtocolVersion());
        request.setTwoWay(false);
        request.setData(message);
            //NettyClient
        channel.send(request, sent);
    }
    

    NettyClient的channel属性是new DecodeHandler(new HeaderExchangeHandler(ExchangeHandlerAdapter))
    channel.send调用链路是AbstractClient.send->NettyChannel.send

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
    
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    

    相关文章

      网友评论

          本文标题:dubbo消费端调用

          本文链接:https://www.haomeiwen.com/subject/ppiwtktx.html