美文网首页
dubbo学习笔记

dubbo学习笔记

作者: 霹雳解锋镝 | 来源:发表于2019-01-12 16:41 被阅读0次

架构

dubbo架构图

节点 角色说明
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器

Spring自定义标签即命令空间实现

//dubbo自定义标签与命名空间继承NamespaceHandlerSupport类
public class DubboNamespaceHandler extends NamespaceHandlerSupport {

static {
    Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
    // 基于xml配置
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    // 基于注解配置
    registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
  }
 }

定义dubbo.xsd 文件在dubbo-config-spring模块下的 src/main/resouce/META-INF中分别定义dubbo.xsd、spring.handlers、spring.schemas。

Bean解析机制

//Spirng的配置支持xml配置文件与注解的方式,故Dubbo也支持两种配置方式
//BeanDefinitionParser:Spring定义的bean解析器,要实现自定义标签,则需要实现该接口,解析xml配置实例化bean
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
//beanClass:该xml标签节点最终会被Spring实例化的类名。
//required:该标签的ID是否必须。
public DubboBeanDefinitionParser(Class<?> beanClass, boolean required) {
    this.beanClass = beanClass;
    this.required = required;
  }
}
//BeanDefinitionParser解析器的主要目的就是将上述标签,解析成对应的BeanDifinition,以便Spring构建上述类的实例。

2、服务提供者

//ServiceBean#afterPropertiesSet获取到配置实例  暴露服务
//包括 provider、application、module、RegistryConfig、monitor、ProtocolConfig和path
if (getProvider() == null) {  // @1
Map<String, ProviderConfig> provide
         ConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class,   false, false); // @2
          // ......  具体解析代码省略。
}
}

//启用延迟暴露机制
if (!isDelay()) {
 export();
}

private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
    delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay.intValue() == -1);
}
//如果有设置dubbo:service或dubbo:provider的属性delay,或配置delay为-1,都表示启用延迟机制,单位为毫秒,设置为-1,表示等到Spring容器初始化后再暴露服务。


//ServiceConfig#export 暴露服务 
public synchronized void export() {

if (provider != null) {
    if (export == null) {
        export = provider.getExport();
    }
    if (delay == null) {
        delay = provider.getDelay();
    }
}
//判断是否暴露服务dubbo:service export="true|false"
if (export != null && ! export.booleanValue()) {
    return;
}
if (delay != null && delay > 0) {
    Thread thread = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(delay);
            } catch (Throwable e) {
            }
            doExport();
        }
    });
    thread.setDaemon(true);
    thread.setName("DelayExportServiceThread");
    thread.start();
} else {
    doExport();
}
}

//ServiceConfig#doExport暴露服务
protected synchronized void doExport() {

checkDefault();

.................
//校验ref与interface属性
if (ref instanceof GenericService) {
    interfaceClass = GenericService.class;
    if (StringUtils.isEmpty(generic)) {
        generic = Boolean.TRUE.toString();
    }
} else {
    try {
        interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                .getContextClassLoader());
    } catch (ClassNotFoundException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    checkInterfaceAndMethods(interfaceClass, methods);
    checkRef();
    generic = Boolean.FALSE.toString();
}
if(local !=null){
    if(local=="true"){
        local=interfaceName+"Local";
    }
    Class<?> localClass;
    try {
        localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
    } catch (ClassNotFoundException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    if(!interfaceClass.isAssignableFrom(localClass)){
        throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
    }
}
if(stub !=null){
    if(stub=="true"){
        stub=interfaceName+"Stub";
    }
    Class<?> stubClass;
    try {
        stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
    } catch (ClassNotFoundException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    if(!interfaceClass.isAssignableFrom(stubClass)){
        throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
    }
}
//校验ServiceBean的application、registry、protocol是否为空。并从系统属性(优先)、资源文件中填充其属性。
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
//校验stub、mock类的合理性,是否是interface的实现类
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
    path = interfaceName;
}

//暴露服务
doExportUrls();
}

//ServiceBean的provider属性为空,调用appendProperties方法,填充默认属性
private void checkDefault() {
if (provider == null) {
    provider = new ProviderConfig();
}
appendProperties(provider);
}

private void doExportUrls() {
//true,代表服务提供者,false:代表服务消费者,
List<URL> registryURLs = loadRegistries(true);
//遍历配置的所有协议,根据每个协议,向注册中心暴露服务
for (ProtocolConfig protocolConfig : protocols) {
    doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

//遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),然后将地址封装成URL对象。
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
    for (RegistryConfig config : registries) {
        String address = config.getAddress();
        if (address == null || address.length() == 0) {
            address = Constants.ANYHOST_VALUE;
        }
        String sysaddress = System.getProperty("dubbo.registry.address");
        if (sysaddress != null && sysaddress.length() > 0) {
            address = sysaddress;
        }
        if (address != null && address.length() > 0 
                && ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
            Map<String, String> map = new HashMap<String, String>();
            appendParameters(map, application);
            appendParameters(map, config);
            map.put("path", RegistryService.class.getName());
            map.put("dubbo", Version.getVersion());
            map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
            if (ConfigUtils.getPid() > 0) {
                map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
            }
            if (! map.containsKey("protocol")) {
                if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                    map.put("protocol", "remote");
                } else {
                    map.put("protocol", "dubbo");
                }
            }
            List<URL> urls = UrlUtils.parseURLs(address, map);
            for (URL url : urls) {
                url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                //服务提供者配置了register="false"或者服务消费者,并配置了subscribe="false"则忽略该地址
                if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                        || (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                    registryList.add(url);
                }
            }
        }
    }
}
return registryList;
 }

//根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

    //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
    if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
        exportLocal(url);
    }
    //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
    if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        if (registryURLs != null && registryURLs.size() > 0
                && url.getParameter("register", true)) {
            for (URL registryURL : registryURLs) {
                //如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,
                //如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
                url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                //构建监控中心的URL
                URL monitorUrl = loadMonitor(registryURL);
                if (monitorUrl != null) {
                    url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                }
                //动态代理机制创建Invoker,dubbo的远程调用实现类
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        } else {
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

            Exporter<?> exporter = protocol.export(invoker);
            exporters.add(exporter);
        }
    }
}
this.urls.add(url);
}

//RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider 获取注册中心
final Registry registry = getRegistry(originInvoker);
// 获取服务url
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 注册中心注册服务
registry.register(registedProviderUrl);
// 订阅override数据 服务提供者向注册中心订阅自己,若服务提供者URL发送变化后重新暴露服务
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }
    public void unexport() {
        try {
            exporter.unexport();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            registry.unregister(registedProviderUrl);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            overrideListeners.remove(overrideSubscribeUrl);
            registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
};
  }

//暴露服务
private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
    synchronized (bounds) {
        exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
            //DubboProtocol#export完成dubbo服务的启动
            exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
            bounds.put(key, exporter);
        }
    }
}
return (ExporterChangeableWrapper<T>) exporter;
}
//DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //获取服务提供者URL,以协议名称,如dubbo://
    URL url = invoker.getUrl();
    
    // export service. 从服务提供者URL中获取服务名
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    
    //export an stub service for dispaching event 是否将转发事件导出成stub
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    //根据url打开服务
    openServer(url);

    // modified by lishen
    optimizeSerialization(url);

    return exporter;
}

//DubboProtocol#openServer
private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    
    //如果服务器已经存在,用当前URL重置服务器。因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
    }
}

//DubboProtocol#createServer
private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat 默认为60*1000,表示60s。
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
    //为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
    //根据SPI机制,判断server属性是否支持
    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
    //为服务提供者url增加codec属性,默认值为dubbo,协议编码方式
    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        //根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    //验证客户端类型是否可用
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

// HeaderExchanger#bind
public class HeaderExchanger implements Exchanger {
    
    public static final String NAME = "header";
    
    // 服务消费者调用  
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    
    // 服务提供者调用
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

// Transporters.bind Dubbo网络传输的接口有Transporter接口实现
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

3、Dubbo服务消费端启动流程
Dubbo服务消费者标签dubbo:reference,Spring容器中创建ReferenceBean实例,该实例实现了Spring生命周期接口:InitializingBean。

//ReferenceBean#afterPropertiesSet
public void afterPropertiesSet() throws Exception {
    // 获取consumerConfig、applicationConfig、moduleConfig、registryConfigs和monitorConfig实例
    if (getConsumer() == null) {
        Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
        if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
            ConsumerConfig consumerConfig = null;
            for (ConsumerConfig config : consumerConfigMap.values()) {
                if (config.isDefault() == null || config.isDefault().booleanValue()) {
                    if (consumerConfig != null) {
                        throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
                    }
                    consumerConfig = config;
                }
            }
            if (consumerConfig != null) {
                setConsumer(consumerConfig);
            }
        }
    }
    // 判断是否初始化
    Boolean b = isInit();
    if (b == null && getConsumer() != null) {
        b = getConsumer().isInit();
    }
    if (b != null && b.booleanValue()) {
        // 调用父类ReferenceConfig中的get()方法,在调用init()方法
        getObject();
    }
}

//ReferenceConfig#init
private void init() {
    // 若初始化直接返回
    if (initialized) {
        return;
    }
    initialized = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
    }
    // 获取消费者全局配置
    checkDefault();
    // 加载属性配置文件的值 dubbo.XXX
    appendProperties(this);
    if (getGeneric() == null && getConsumer() != null) {
        setGeneric(getConsumer().getGeneric());
    }
    if (ProtocolUtils.isGeneric(getGeneric())) {
        // 如果使用返回引用,将interface值替换为GenericService全路径名
        interfaceClass = GenericService.class;
    } else {
        try {
            // 加载interfacename
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        // dubbo:method引用的方法是否在interface指定的接口中存在
        checkInterfaceAndMethods(interfaceClass, methods);
    }
    
    // dubbo服务消费端resolve机制,消息消费者绕过注册中心只连服务提供者  -Dinterface=dubbo://127.0.0.1:20880,其中interface为dubbo:reference interface属性的值
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    
    //-Ddubbo.resolve.file=文件路径名来指定
    if (resolve == null || resolve.length() == 0) {
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                resolveFile = userResolveFile.getAbsolutePath();
            }
        }
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
                properties.load(fis);
            } catch (IOException e) {
                throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
            } finally {
                try {
                    if(null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
            resolve = properties.getProperty(interfaceName);
        }
    }
    // 如果resolve不为空,则填充ReferenceBean的url属性为resolve(点对点服务提供者URL)
    if (resolve != null && resolve.length() > 0) {
        url = resolve;
        if (logger.isWarnEnabled()) {
            if (resolveFile != null && resolveFile.length() > 0) {
                logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
            } else {
                logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
            }
        }
    }
    if (consumer != null) {
        if (application == null) {
            application = consumer.getApplication();
        }
        if (module == null) {
            module = consumer.getModule();
        }
        if (registries == null) {
            registries = consumer.getRegistries();
        }
        if (monitor == null) {
            monitor = consumer.getMonitor();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    
    // 校验ReferenceBean的application是否为空,如果为空
    checkApplication();
    
    // 校验stub、mock实现类与interface的兼容性
    checkStubAndMock(interfaceClass);
    
    // 封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    
    // 如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开
    if (! isGeneric()) {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if(methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        }
        else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    
    // 存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性
    map.put(Constants.INTERFACE_KEY, interfaceName);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    
    // 
    String prifix = StringUtils.getServiceKey(map);
    if (methods != null && methods.size() > 0) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            appendAttributes(attributes, method, prifix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    //attributes通过系统context进行存储.
    StaticContext.getSystemContext().putAll(attributes);
    
    // 创建消息消费者代理
    ref = createProxy(map);
}

//创建消息消费者代理
private T createProxy(Map<String, String> map) {
    
    // 判断该消费者是否是引用本(JVM)内提供的服务
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
        if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
            isJvmRefer = false;
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            //默认情况下如果本地有服务暴露,则引用本地服务.
            isJvmRefer = true;
        } else {
            isJvmRefer = false;
        }
    } else {
        isJvmRefer = isInjvm().booleanValue();
    }
    
    if (isJvmRefer) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        
        //本地JVM中的服务,则利用InjvmProtocol创建Invoker
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        url = url.setPath(interfaceName);
                    }
                    
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    
                        // 如果直连提供者的协议为registry,则对url增加refer属性,其值为消息消费者所有的属性。(表示从注册中心发现服务提供者)
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                    
                        // 如果是其他协议提供者,则合并服务提供者与消息消费者的属性,并移除服务提供者默认属性。以default开头的属性。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // 通过注册中心配置拼装URL 普通消息消费者,从注册中心订阅服务
            //获取所有注册中心URL,其中参数false表示消费端 需要排除dubbo:registry subscribe=false的注册中心,其值为false表示不接受订阅
            List<URL> us = loadRegistries(false); 
            if (us != null && us.size() > 0) {
                for (URL u : us) {
                    // 根据注册中心URL,构建监控中心URL
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {//在注册中心URL后增加属性monitor
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 在注册中心URL中,追加属性refer,其值为消费端的所有配置组成的URL
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls == null || urls.size() == 0) {
                throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
            }
        }
        
        // 如果只有一个服务提供者URL,则直接根据协议构建Invoker
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {// 若有多个服务提供者可构成一个集群
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // 用了最后一个registry url
                }
            }
            if (registryURL != null) { // 有 注册中心协议的URL 集群模式实现的Invoker
                // 对有注册中心的Cluster 只用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                invoker = cluster.join(new StaticDirectory(u, invokers));
            }  else { // 不是 注册中心的URL
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true; // default true
    }
    // dubbo:referecnce的check=true或默认为空,则需要判断服务提供者是否存在
    if (c && ! invoker.isAvailable()) {
        throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    // 创建服务代理
    return (T) proxyFactory.getProxy(invoker);
}

// AbstractProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    Class<?>[] interfaces = null;
    
    // 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            interfaces[0] = invoker.getInterface();
            
            // 增加默认接口EchoService接口
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i ++) {
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
    }
    // 根据需要实现的接口,使用jdk或Javassist创建代理类
    return getProxy(invoker, interfaces);
}

参考:
https://blog.csdn.net/prestigeding/article/details/80637239

相关文章

  • Dubbo 服务调用 总结(八)

    笔记简述结合之前学习的两篇笔记 Dubbo 服务调用 源码学习(上)(六)和 Dubbo 服务调用 源码学习(下)...

  • Dubbo 服务暴露 总结(五)

    笔记简述Dubbo服务暴露之前分为了两小节Dubbo 服务暴露 源码学习(上)(三) 和Dubbo 服务暴露 源码...

  • Dubbo 2.7.7 学习笔记 1

    Dubbo 2.7.7 起步入门学习笔记 本文档基于 Dubbo 中文官方文档编写,详情请参见 Dubbo 中文官...

  • Dubbo | Dubbo快速上手笔记 - 环境与配置

    前言 比较基础的dubbo学习笔记,一些参考资料如下: 尚硅谷Dubbo教程(dubbo经典之作)[https:/...

  • Dubbo 简要介绍和使用 学习(一)

    笔记简述本学习笔记主要是介绍了dubbo的基础内容,简单说明了dubbo、rpc、soa、zk等概念,并没有直接贴...

  • Dubbo SPI 源码学习 & admin安装(二)

    笔记简述本学习笔记主要是介绍了SPI的使用以及原理,dubbo是如何实现自身的SPI,dubbo如何使用的可以看D...

  • Dubbo学习笔记

    Dubbo简介 Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式可...

  • Dubbo学习笔记

    1. Dubbo入门 Dubbo是一款高性能Java RPC框架,常用来构建分布式系统。Dubbo为开发者提供了三...

  • Dubbo学习笔记

    0 写在前面 项目使用Dubbo近一年了,期间大大小小遇到过一些问题。在这个过程中也一直在尝试阅读Dubbo源码,...

  • Dubbo学习笔记

    [TOC] 一、分布式基本知识 1.1) 架构演变 先给出dubbo官方的图,图片表示了架构的演变。然后我说一下自...

网友评论

      本文标题:dubbo学习笔记

      本文链接:https://www.haomeiwen.com/subject/ubsgrqtx.html