架构
dubbo架构图节点 角色说明
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器
Spring自定义标签即命令空间实现
//dubbo自定义标签与命名空间继承NamespaceHandlerSupport类
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
// 基于xml配置
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
// 基于注解配置
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
定义dubbo.xsd 文件在dubbo-config-spring模块下的 src/main/resouce/META-INF中分别定义dubbo.xsd、spring.handlers、spring.schemas。
Bean解析机制
//Spirng的配置支持xml配置文件与注解的方式,故Dubbo也支持两种配置方式
//BeanDefinitionParser:Spring定义的bean解析器,要实现自定义标签,则需要实现该接口,解析xml配置实例化bean
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
//beanClass:该xml标签节点最终会被Spring实例化的类名。
//required:该标签的ID是否必须。
public DubboBeanDefinitionParser(Class<?> beanClass, boolean required) {
this.beanClass = beanClass;
this.required = required;
}
}
//BeanDefinitionParser解析器的主要目的就是将上述标签,解析成对应的BeanDifinition,以便Spring构建上述类的实例。
2、服务提供者
//ServiceBean#afterPropertiesSet获取到配置实例 暴露服务
//包括 provider、application、module、RegistryConfig、monitor、ProtocolConfig和path
if (getProvider() == null) { // @1
Map<String, ProviderConfig> provide
ConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false); // @2
// ...... 具体解析代码省略。
}
}
//启用延迟暴露机制
if (!isDelay()) {
export();
}
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay.intValue() == -1);
}
//如果有设置dubbo:service或dubbo:provider的属性delay,或配置delay为-1,都表示启用延迟机制,单位为毫秒,设置为-1,表示等到Spring容器初始化后再暴露服务。
//ServiceConfig#export 暴露服务
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
//判断是否暴露服务dubbo:service export="true|false"
if (export != null && ! export.booleanValue()) {
return;
}
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
//ServiceConfig#doExport暴露服务
protected synchronized void doExport() {
checkDefault();
.................
//校验ref与interface属性
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = Boolean.FALSE.toString();
}
if(local !=null){
if(local=="true"){
local=interfaceName+"Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if(!interfaceClass.isAssignableFrom(localClass)){
throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if(stub !=null){
if(stub=="true"){
stub=interfaceName+"Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if(!interfaceClass.isAssignableFrom(stubClass)){
throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
//校验ServiceBean的application、registry、protocol是否为空。并从系统属性(优先)、资源文件中填充其属性。
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
//校验stub、mock类的合理性,是否是interface的实现类
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//暴露服务
doExportUrls();
}
//ServiceBean的provider属性为空,调用appendProperties方法,填充默认属性
private void checkDefault() {
if (provider == null) {
provider = new ProviderConfig();
}
appendProperties(provider);
}
private void doExportUrls() {
//true,代表服务提供者,false:代表服务消费者,
List<URL> registryURLs = loadRegistries(true);
//遍历配置的所有协议,根据每个协议,向注册中心暴露服务
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
//遍历ServiceBean的List< RegistryConfig> registries(所有注册中心的配置信息),然后将地址封装成URL对象。
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& ! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
//服务提供者配置了register="false"或者服务消费者,并配置了subscribe="false"则忽略该地址
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
//根据scope来暴露服务,如果scope不配置,则默认本地与远程都会暴露,如果配置成local或remote,那就只能是二选一
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
//如果dubbo:service的dynamic属性未配置, 尝试取dubbo:registry的dynamic属性,该属性的作用是否启用动态注册,
//如果设置为false,服务注册后,其状态显示为disable,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
//构建监控中心的URL
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//动态代理机制创建Invoker,dubbo的远程调用实现类
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
//RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider 获取注册中心
final Registry registry = getRegistry(originInvoker);
// 获取服务url
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 注册中心注册服务
registry.register(registedProviderUrl);
// 订阅override数据 服务提供者向注册中心订阅自己,若服务提供者URL发送变化后重新暴露服务
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
//暴露服务
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//DubboProtocol#export完成dubbo服务的启动
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
//DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//获取服务提供者URL,以协议名称,如dubbo://
URL url = invoker.getUrl();
// export service. 从服务提供者URL中获取服务名
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event 是否将转发事件导出成stub
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//根据url打开服务
openServer(url);
// modified by lishen
optimizeSerialization(url);
return exporter;
}
//DubboProtocol#openServer
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
//如果服务器已经存在,用当前URL重置服务器。因为一个Dubbo服务中,会存在多个dubbo:service标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
//DubboProtocol#createServer
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat 默认为60*1000,表示60s。
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//为服务提供者url增加server属性,可选值为netty,mina等等,默认为netty。
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
//根据SPI机制,判断server属性是否支持
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
//为服务提供者url增加codec属性,默认值为dubbo,协议编码方式
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//根据服务提供者URI,服务提供者命令请求处理器requestHandler构建ExchangeServer实例
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//验证客户端类型是否可用
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
// HeaderExchanger#bind
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
// 服务消费者调用
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
// 服务提供者调用
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
// Transporters.bind Dubbo网络传输的接口有Transporter接口实现
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
3、Dubbo服务消费端启动流程
Dubbo服务消费者标签dubbo:reference,Spring容器中创建ReferenceBean实例,该实例实现了Spring生命周期接口:InitializingBean。
//ReferenceBean#afterPropertiesSet
public void afterPropertiesSet() throws Exception {
// 获取consumerConfig、applicationConfig、moduleConfig、registryConfigs和monitorConfig实例
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
for (ConsumerConfig config : consumerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (consumerConfig != null) {
throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
}
consumerConfig = config;
}
}
if (consumerConfig != null) {
setConsumer(consumerConfig);
}
}
}
// 判断是否初始化
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b.booleanValue()) {
// 调用父类ReferenceConfig中的get()方法,在调用init()方法
getObject();
}
}
//ReferenceConfig#init
private void init() {
// 若初始化直接返回
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// 获取消费者全局配置
checkDefault();
// 加载属性配置文件的值 dubbo.XXX
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
// 如果使用返回引用,将interface值替换为GenericService全路径名
interfaceClass = GenericService.class;
} else {
try {
// 加载interfacename
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// dubbo:method引用的方法是否在interface指定的接口中存在
checkInterfaceAndMethods(interfaceClass, methods);
}
// dubbo服务消费端resolve机制,消息消费者绕过注册中心只连服务提供者 -Dinterface=dubbo://127.0.0.1:20880,其中interface为dubbo:reference interface属性的值
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
//-Ddubbo.resolve.file=文件路径名来指定
if (resolve == null || resolve.length() == 0) {
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if(null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
// 如果resolve不为空,则填充ReferenceBean的url属性为resolve(点对点服务提供者URL)
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null && resolveFile.length() > 0) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
}
}
}
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 校验ReferenceBean的application是否为空,如果为空
checkApplication();
// 校验stub、mock实现类与interface的兼容性
checkStubAndMock(interfaceClass);
// 封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开
if (! isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
//
String prifix = StringUtils.getServiceKey(map);
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prifix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
//attributes通过系统context进行存储.
StaticContext.getSystemContext().putAll(attributes);
// 创建消息消费者代理
ref = createProxy(map);
}
//创建消息消费者代理
private T createProxy(Map<String, String> map) {
// 判断该消费者是否是引用本(JVM)内提供的服务
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
//本地JVM中的服务,则利用InjvmProtocol创建Invoker
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 如果直连提供者的协议为registry,则对url增加refer属性,其值为消息消费者所有的属性。(表示从注册中心发现服务提供者)
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 如果是其他协议提供者,则合并服务提供者与消息消费者的属性,并移除服务提供者默认属性。以default开头的属性。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL 普通消息消费者,从注册中心订阅服务
//获取所有注册中心URL,其中参数false表示消费端 需要排除dubbo:registry subscribe=false的注册中心,其值为false表示不接受订阅
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
// 根据注册中心URL,构建监控中心URL
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {//在注册中心URL后增加属性monitor
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 在注册中心URL中,追加属性refer,其值为消费端的所有配置组成的URL
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 如果只有一个服务提供者URL,则直接根据协议构建Invoker
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {// 若有多个服务提供者可构成一个集群
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL 集群模式实现的Invoker
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
// dubbo:referecnce的check=true或默认为空,则需要判断服务提供者是否存在
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
// AbstractProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null;
// 从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
// 增加默认接口EchoService接口
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i ++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
}
// 根据需要实现的接口,使用jdk或Javassist创建代理类
return getProxy(invoker, interfaces);
}
参考:
https://blog.csdn.net/prestigeding/article/details/80637239
网友评论