美文网首页
dubbo系列之-服务暴露-2021-01-16

dubbo系列之-服务暴露-2021-01-16

作者: five_year | 来源:发表于2021-01-16 23:46 被阅读0次

    背景

    服务暴露网上已经有很多文章了,大而全,我们这里主要抓细节😄。

    image

    疑问

    暴露过程做了些啥?

    是先启动服务还是先连接注册中心?

    服务下线怎么感知注册中心?

    暴露

    我们从 org.apache.dubbo.config.ServiceConfig#doExportUrls() 方法进去

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        //支持多协议暴露就是说 <dubbo:protocol 可以多个
        //<dubbo:protocol name="dubbo" port="20880"/>
        //<dubbo:protocol  name="rest" port="20881"/>
        //像这样,如果有php客户端 和 dubbo客户端都可以同事支持
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    
    

    进入 doExportUrlsFor1Protocol()中,这个方法大家一定要进去瞅一眼,和我们写的代码也差不多,方法长度太长,而且循环嵌套很深。

    //org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {//没有配置协议,默认dubbo
            name = DUBBO;
        }
    
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);
        //将所有的配置都放到URL 的key=value 中
        appendRuntimeParameters(map);
        appendParameters(map, metrics);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if (ProtocolUtils.isGeneric(generic)) {//泛化
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {//版本
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
    
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        //token,dubbo 支持token校验,只有携带对的token才能调用成功
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }
    
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
    
        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
           if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);//先将服务暴露到本地,下面分析
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    //注册中心也支持多个,比如可以将服务暴露到集群内,也可以将
                    //服务暴露到中台供所其他业务线用
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        //加载监控配置
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
    
                        // 调用具体bean的代理模式,默认为javassist 
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        //组装invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        //暴露服务
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                //存储发布信息
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }
    
    

    本地暴露 exportLocal(url)

    //org.apache.dubbo.config.ServiceConfig#exportLocal
    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
           .setProtocol(LOCAL_PROTOCOL)//收到设置协议为injvm,以供下面选择对应的protocol
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
    //
        Exporter<?> exporter = protocol.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
    }
    static Protocol protocol = ExtensionLoader
    .getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    static ProxyFactory PROXY_FACTORY = ExtensionLoader
    .getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    
    

    protocol 静态变量为 Protocol 接口的自适应扩展点,调用 protocol.export(Invoker<T> invoker) 将会根据传入的invoker 信息决定去往哪个实现类。而 invoker 传入的值为

    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local) ,PROXY_FACTORY 静态变量也是一个 ProxyFactory 的扩展点,从下面可以看到该扩展点为方法扩展点,这里我们并没有个自定义过proxy属性,默认实现为javassist=JavassistProxyFactory;(这里忽略各种包装器)

    @SPI("javassist")
    public interface ProxyFactory {
        @Adaptive({"proxy"})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
    }
    
    

    进到JavassistProxyFactory 的 getInvoker实现中。

    //org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 
    //这里的proxy 是我们真正的实现类HelloServiceImpl@xxx,
    //如果传进来的是一个代理类实现的花,这里只取接口type=HelloService
    Class cls = proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type;
    //将 HelloServiceImpl包装成一个Wrapper类,而wrapper对象的创建方式正式默认的javassist
    final Wrapper wrapper = Wrapper.getWrapper(cls);
    //返回一个匿名内部类对象,对象 doInvoke 方法中持有wrapper对象
    //AbstractProxyInvoker 实现了Invoker
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
    
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
    }
    
    

    上面这种匿名的写法可能不够具体,我们通过自定义类的方式去实现它,更具象点

    //org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new MyProxyInvoker(proxy,type,url,wrapper);
    }
    public class MyProxyInvoker extends AbstractProxyInvoker {
        private Wrapper wrapper;
        public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) {
            super(proxy, type, url);
            this.wrapper = wrapper;
        }
        @Override
        protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    }
    
    

    这样写的效果是一样的 JavassistProxyFactory#getInvoker()方法返回的是 MyProxyInvoker 对象,后面我们就用该对象来描述分析。

    回到 Exporter<?> exporter = protocol.export(

    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));中,表达式变成了Exporter<?> exporter = protocol.export(MyProxyInvoker),MyProxyInvoker中的url对象为local

    URL local = URLBuilder.from(url)
            .setProtocol("injvm")
            .setHost(LOCALHOST_VALUE)
            .setPort(0)
            .build();
    
    

    所以protocol.export()的实现类为InjvmProtocol

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
         exporterMap);
    }
    
    

    该方法返回 InjvmExporter,最后执行 exporters.add(exporter),将InjvmExporter(这里其实外面会包装一层ListenerExporterWrapper包装器) 对象暴露到map中结束了jvm本地暴露。

    远程暴露

    我们再来看看远程暴露的区别

    //同本地暴露一样返回MyProxyInvoker实例
    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    //区别本地暴露 将 MyProxyInvoker实例 包装为 DelegateProviderMetaDataInvoker
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    //这里写法和本地暴露一样,区别在于 wrapperInvoker 中的url#protocol 并不是injvm
    Exporter<?> exporter = protocol.export(wrapperInvoker);
    exporters.add(exporter);
    
    

    我们dubug 看看 wrapperInvoker 中的url#protocol 是啥

    image

    Protocol 为registry,所以流程会进入到 RegistryProtocol#export(同样这里也会有Wrapper包装)我们debug进去,这个方法内容太丰富了,这里我们先只分析服务暴露

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // 获取要暴露到注册中心的url
        URL providerUrl = getProviderUrl(originInvoker);
    
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //暴露服务 下面分析
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
        //...
        return new DestroyableExporter<>(exporter);
    }
    
    
    暴露服务 doLocalExport()
    //org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        //将要暴露的服务生成唯一的key,避免重复
        String key = getCacheKey(originInvoker);
        //再次包装invoker,然后暴露
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        //providerUrl 为dubbo://xxx
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //protocol.export 经过各种Wrapper 会进入到Dubbo.export
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }
    private String getCacheKey(final Invoker<?> originInvoker) {
        URL providerUrl = getProviderUrl(originInvoker);
        String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
        return key;
    }
    
    

    这里originInvoker为 DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx)),invokerDelegate再次包装为InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))),我们继续debug,到了ProtocolFilterWrapper#export

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
               //.....
                };
            }
        }
        return new CallbackRegistrationInvoker<>(last, filters);
    }
    
    

    buildInvokerChain()会将InvokerDelegate关联多个Filter过滤器,然后包装为CallbackRegistrationInvoker对象返回,我们接着debug,最后到了DubboProtocol#export(),此时的invoker为CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))

    image
    //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //生成服务key=com.poizon.study.api.service.HelloService:20880,和方法无关
        String key = serviceKey(url);
        //将CallbackRegistrationInvoker包装为DubboExporter,然后存储在map中
        //这个map 很关键,将作为后面调用寻找服务的入口
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
    
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //开启服务,也就是调用netty,开启20880端口
        openServer(url);
        //加载指定序列化方式 默认采用hessan2
        optimizeSerialization(url);
        return exporter;
    }
    //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
    private void openServer(URL url) {
        //..... createServer()创建服务
       serverMap.put(key, createServer(url));
    }
    //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
    private ExchangeServer createServer(URL url) {
    
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
    
        return server;
    }
    //org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    //org....remoting.Transporters#bind(URL, ChannelHandler...)
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }//默认选择netty4 实现
        return getTransporter().bind(url, handler);
    }
    
    //org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
    //org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
        try {
            doOpen();
    //org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen     
    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class);
    
        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }
    
        bootstrap.handler(new ChannelInitializer() {
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
    
    

    跟到最后看到了熟悉的netty启动,这里有好多我们熟悉的配置,比如第一篇文章我们说到的心跳实现IdleStateHandler,以及心跳默认时间 UrlUtils.getHeartbeat(getUrl()),还有netty 的自定义handler nettyClientHandler(没错这个handler就是处理dubbo消费者请求的)

    总结

    总结下,我们一根线走到底,走到了最后的socket启动,最后将 DubboExporter 放入了map中,最后层层包装为 DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))))));嵌套虽然多了点,但是Wrapper 类的功能都是为了扩展小功能,后面我们调几个分析

    image

    后面将分析注册中心和Wrapper 等功能。

    相关文章

      网友评论

          本文标题:dubbo系列之-服务暴露-2021-01-16

          本文链接:https://www.haomeiwen.com/subject/ainoaktx.html