dubbo服务导出始于Spring容器发布刷新事件,dubbo在接收到对应事件的时候就开始服务的导出。服务导出过程主要可以分成以下3个步骤:
- 前置工作,参数检查以及组装URL。URL在dubbo中挺关键的,作为信息流在dubbo中不停地流转
- 服务导出,生成相应的Invoker。主要包含本地导出以及远程导出。
- 服务注册,向注册中心注册服务。
dubbo服务导出实现的效果如下:
-
开启了一个端口20880(可配置),用来处理远程服务调用请求。
-
向注册中心发布了一个可调用地址:dubbo:// 192.168.1.1:20880/interface?……,服务消费者便可以通过注册中心获取服务提供者信息
dubbo配置解析
dubbo的启动源于Spring,Spring容器启动的过程分为xml文件定位、BeanDefinition载入和解析以及BeanDefinition在Ioc容器中的注册。dubbo的provider和consumer配置文件也随之载入解析,对应的配置信息也就被载入生成,组装配置信息URL。始于DubboNamespaceHandler,载入了很多BeanDefinitionParser,完成对dubbo配置文件中的标签进行解析:
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
由上可以看到很多解析标签类,在服务导出中用的有application、registry、protocol、monitor、service等。
dubbo服务导出
完成了配置的解析之后,在Spring发布刷新事件之后,便开始了服务导出操作:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//服务是否延迟导出或延迟导出
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
public class ServiceConfig<T> extends AbstractServiceConfig {
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
//如果配置了延迟导出,便启动一个延迟任务,在延迟任务中进行服务导出操作
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
//即时导出服务
doExport();
}
}
}
服务导出的时候会根据配置进行检查是否需要服务到处以及延迟导出,延迟导出会启动一个延迟任务。
protected synchronized void doExport() {
//......
//检查服务是否已导出
if (exported) {
return;
}
//服务导出标志位true
exported = true;
//接口名称为空,抛出异常
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
//.......
//是否为泛化服务
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
//根据当前线程上线文类加载器加载当前接口类
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
//检查接口和方法,接口类是否未加载,方法是否为当前接口的方法
checkInterfaceAndMethods(interfaceClass, methods);
//检查接口实现类是否实现的当前类
checkRef();
generic = Boolean.FALSE.toString();
}
//.......
//检查application配置,并将参数添加到application中
checkApplication();
//检查registry配置,并将参数添加到registry中
checkRegistry();
//检查protocol配置,并将参数添加到protocol中
checkProtocol();
//追加参数到ServiceConfig
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//服务导出
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
服务导出前的检查已经参数的组装,在ServiceConfig服务配置中进行,然后是进行服务导出。
private void doExportUrls() {
//加载注册地址信息,可能配置多注册中心
List<URL> registryURLs = loadRegistries(true);
//根据不同的协议,进行服务导出,这里可以是多协议和多注册中心
for (ProtocolConfig protocolConfig : protocols) {
//服务导出
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
这里根据配置的注册中心和配置的协议不同,进行不同的服务导出。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//获取协议名称,默认dubbo协议
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
//处理参数
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
//...参数处理
//接口方法级别的配置处理,譬如接口中的某个方法禁止retry
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
//...
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
//这里获取接口的方法数组,这里的接口被包装成了Wrapper,是通过反射修改字节码,不知道为什么要包装成Wrapper,没有太理解其中的奥妙
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
//将方法名称用逗号链接起来,放入参数中
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
//配置URL信息
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//获取接口的scope配置信息
String scope = url.getParameter(Constants.SCOPE_KEY);
//scope=none时不做任何事情
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//scope != remote时,进行服务导出到本地JVM
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//scope != local时,进行服务导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
//遍历注册中心,不同的注册中心进行服务导出
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
//获取Invoker,这里是通过JavasisstProxyFacatory获取的Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//这里没有什么,仅仅是对Invoker和MetaData进行包装
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//这里是核心,对服务导出
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
这里ServiceConfig的doExportUrlsFor1Protocol方法的逻辑主要是以下这些:
- 获取协议名称以及参数组装
- 获取接口方法级别的配置参数,譬如retry接口方法重试参数
- 获取接口方法数组,这个时候接口会被包装成Wrapper,不太理解这里为什么要将接口包装成Wrapper,是通过反射修改字节码,进行对接口包装。
- 获取接口scope配置参数,配置参数主要有none、local和remote,没有配置这个参数的时候会进行服务导出到本地JVM和远程。配置local和remote分别只会导出到本地JVM和远程,none配置则不会做任何事情。
- 获取Invoker,这里是通过proxyFacatory.getInvoker()获取的Invoker,ProxyFacatory是通过自适应扩展获取的,dubbo中默认是JavasisstProxyFacatory,具体下面分析
- 服务导出,protocol.export()这里才是真正的服务导出的核心,protocol是通过自适应扩展加载的。
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//获取接口的Wrapper,服务导出的时候在获取接口方法的时候就已经生成了接口的Wrapper,这里只是从本地缓存中获取
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//这里的Invoker是AbstractProxyInvoker的实现类,然后doInvoke还是通过调用Wrapper的invokerMethod方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
这里可以看到Invoker的具体实现,他是通过JavasisstProxyFacatory生成的,他是AbstractProxyInvoker的实现类,但是doInvoke方法的调用是通过Wrapper的invokeMethod方法进行调用的,所以Invoker后面又借用了Wrapper,而不是直接调用接口的方法,这里的Wrapper还是通过Javasisst拿到接口类Class对象生成对应的字节码,然后通过反射创建具体的Wrapper对象实例。说白了还是靠Javasisst生成字节码然后反射创建实例。
public class ProtocolListenerWrapper implements Protocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
}
public class ProtocolFilterWrapper implements Protocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
}
这里是ProtocolListenerWrapper和ProtocolFilterWrapper进行export,buildInvokerChain是对dubbo中filter进行的处理,然后是RegistryProtocol进行export处理。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//服务导出
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//获取注册中心url
URL registryUrl = getRegistryUrl(originInvoker);
//获取Invoker的Registry,这里是ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
//将服务注册到本地注册表中
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//注册服务到注册中心,以zookeeper为例
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 订阅override信息
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
}
这里的RegistryProtocol的export主要是做了下面这些事情:
- 进行服务导出,开启端口,doLocalExport
- 服务注册到注册中心register
- 订阅override信息
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//com.alibaba.dubbo.demo.DemoService:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//开启服务,核心
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
//192.168.124.14:20880
String key = url.getAddress();
//生成一个ExchangeServer
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//这里绑定端口创建ExchangeServer,这里的requestHandler是对服务调用请求的处理
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
//...
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty4";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
//...
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
//...
}
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//...
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//...
}
}
public class NettyServer extends AbstractServer implements Server {
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
这里梳理下DubboExporter逻辑:
- 创建DubboExporter,将其放入exporterMap中,key为接口完全限定名:port
- 开启服务监听端口,这里主要是网络通信端的内容,以Netty4为例进行分析
下面说下DubboProtocol中createServer逻辑:在这里出现了Exchanger,默认是HeaderExchanger,然后是HeaderExchanger将bind委托给Transporter进行bind,然后自适应找出对应Transporter实现类,这里以NettyTransporter进行分析,NettyTransporter这个时候bind是创建一个NettyServer,然后就是Netty网络通信的事情了,创建NettyServer无非就是绑定20880端口进行监听,并进行处理服务消费方服务调用请求。
这里可以看到Exchanger、Transporter和NettyServer这样的分层,有点类似应用结构的分层Service、Manager和Dao,这里自己理解的是分层逻辑清晰,便于维护也便于扩展,通过SPI结合模板设计模式,每层可以有不同的技术方案实现,很灵活很容易扩展,非常好。个人觉得也是dubbo中的一个亮点。
从DubboProtocol中可以看出,完成了Exporter创建以及开启了网络通信接口,完成了服务提供者对端口的监听,当有服务消费方进行服务调用的时候就可以进行请求的处理,这里就是网络通信Netty层面的东西了。
服务注册
服务注册的作用,主要是为了方便服务的治理,Consumer端可以及时感应到服务提供者的变化。
public class RegistryProtocol implements Protocol {
public void register(URL registryUrl, URL registedProviderUrl) {
//根据RegistryFacatory获取Registry
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
//...
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
zkClient = zookeeperTransporter.connect(url);
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
public abstract class AbstractZookeeperClient<TargetChildListener> implements ZookeeperClient {
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
//path:/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.124.14%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D3616%26side%3Dprovider%26timestamp%3D1577873268062
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
}
服务注册,主要是将服务提供者的一些信息,在zookeeper上创建对应的临时节点。这里也思考了下为什么是临时节点?临时节点的话,服务提供者和Zookeeper断开连接的话,服务对应创建的临时节点就会自动删除,如果是持久化的节点,就需要手动进行删除,当服务提供者和Zookeeper断开连接的时候对应的持久节点就没有被删除,下次服务提供者启动的时候向注册中心注册服务创建节点的时候就会失败。
总结
以上就是dubbo服务导出的整个过程,主要包含了前期配置解析和检查,Invoker创建以及导出服务到本地JVM和远程,最后是将服务注册到注册中心。一个完整的服务提供者便可以通过注册中心被消费者发现,并未消费者提供服务。这里面穿插了很多的设计模式,譬如工厂、单例(DCL)和模板等设计模式,并且利用dubbo的自适应扩展机制,以及Exchanger、Transporter和NettyServer的分层,让dubbo整个结构层次晓得特别清晰也便于维护,自适应扩展机制让dubbo易于扩展,具体组件的形式可以简单更换。
展望
这是2020年的第一篇博客,元旦的一天假期基本上都花在分析总结dubbo服务导出这块,才总结出了这篇博客。希望自己有始有终,能够继续深入理解分析dubbo源码这块,最后能对Rpc框架这块有自己的一份心得,能够增强自己知识库的储备,让自己多一份闪光点。20年希望自己能在技术深度这块有所突破和巩固,技术宽度上有一定的拓宽,技术视野能够更好点。希望自己能够保有像今天这样的学习热情,生活工作繁忙除外,始终不要忘记学习,始终保有对技术的热情和专注。石头出现裂缝的101捶之前,继续前100捶的敲打,直到石头裂开,加油!
网友评论