美文网首页
Dubbo消费者调用提供者

Dubbo消费者调用提供者

作者: 你值得拥有更好的12138 | 来源:发表于2019-01-04 22:07 被阅读0次

一、消费者构造Invoker

消费者动态的构造Invoker是通过监听器,在注册中心服务变化后通过Directory调用DubboProtocol的refer方法来构造Invoker

@Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

在ReferenceConfig中通过inoker去生成代理类返回,在执行调用的时候去调用Invoker的invoker方法。
下面代码是ReferenConfig的createProxy方法的最后一步。

 if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;

}

参见Dubbo消费者订阅,知为啥是DubboInover
在这里的Invoker的实现是DubboInvoker,这个类的invoke方法去调用子类DubboInvoker的doInvoke方法。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

通过网络层的ExchangeClient 进行网络通信,获取结果result。区别异步和同步方式。到此消费者就发出请求了!

二、提供者接收请求

RegistryProtocol中export方法中调用对应的对应的注册中心类进行注册服务,而且还将Invoker传入DubboProtocol中进行服务的开启与监听接收的请求。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    //此时的protocol由于url的变化已经不是RegistryProtocol,而是DubboProtocol
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

DubboProtocol中将Invoker中的url进行长连接的创建并开启端口监听。

 @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //创建连接,并开启监听
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

二、提供者监听请求

在上述的openServer方法中进行连接创建并传入了一个回调对象,当接收到回调函数就去调用invoker的invoke方法,然后去实际调用具体对象的方法获取结果

 private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
           //传入requestHandler回调对象
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

相关文章

  • Dubbo如何支持本地调用?injvm方式解析

    Dubbo是一个远程调用的框架,对于一个服务提供者,暴露了一个接口供外部消费者调用,那么对于提供者自己是否可以调用...

  • Maven打包插件

    Dubbo做一个服务提供者,以Dubbo方法自提供的Main方法启动,供消费者调用,希望做到类似SpringBoo...

  • 2.Dubbo源码阅读-配置篇

    Dubbo的分支: 3.0Dubbo的服务提供者会将RPC服务的调用说明,导出到配置中心。然后服务的消费者向配置中...

  • dubbo源码分析-服务调用

    dubbo服务调用,就是服务消费者发起调用服务提供者接口的过程。是对前面服务导出,服务引用,服务集群、路由和负载均...

  • 看 Netty 在 Dubbo 中如何应用

    目录: dubbo 的 Consumer 消费者如何使用 Netty dubbo 的 Provider 提供者如何...

  • Dubbo剖析-监控平台的搭建与使用

    一、前言 dubbo-monitor主要用来统计服务的调用次调和调用时间的监控中心,服务消费者和提供者,在内存中累...

  • Dubbo消费者调用提供者

    一、消费者构造Invoker 消费者动态的构造Invoker是通过监听器,在注册中心服务变化后通过Director...

  • 从零创建一个DubboRPC框架

    准备一个消费者和一个提供者两个项目 一、在消费者项目上的Dubbo配置 1、在调用者(消费者consumer)的模...

  • dubbo配置-主要配置

    1,dubbo:application name:指定当前服务/应用名字(提供者/消费者) 2,dubbo:reg...

  • dubbo-admin管理平台搭建

    通过dubbo-admin可以对消费者和提供者进行管理 下载dubbo-admin dubbo-admin的下载,...

网友评论

      本文标题:Dubbo消费者调用提供者

      本文链接:https://www.haomeiwen.com/subject/zsntxqtx.html