接上一章内容,当service的本地发布结束后进入远程发布,远程发布流程与本地发布类似将ref转为invoker然后将invoker抓为expoter,代码如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
可以看到,ref转invoker的方法与上一章中的exportLocal转化的过程类似,执行的都是JavassistProxyFactory中的getInvoker方法,然后返回一个new 的AbstractProxyInvoker对象。
之后将得到的AbstractProxyInvoker封装成一个DelegateProviderMetaDataInvoker。
接下来我们来看Exporter<?> exporter = protocol.export(wrapperInvoker)这一步。
执行步骤:
1、通过协议protocol发布之前生成的DelegateProviderMetaDataInvoker,将invoker转化为expoter
2、将invoker转换为exporter --》启动netty--》注册服务到zookeeper--》订阅--》返回新的exporter实例
3、将exporter放入缓存对象
这里的protocol与proxyFactory一样是javassist生成的一个class“protocol$Adaptive”,执行的export部分代码如下:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
f (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
根据代码可以看出最后执行的是RegistryProtocol.export(final Invoker<T> originInvoker);
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//进行发布处理
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);//将url路径注册到zookeeper上
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
我们重点看方法doLocalExport:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);//在这步会将invoker转为exporter
bounds.put(key, exporter);
}
}
}
return exporter;
}
该方法首先查看缓存中时候存在相应的expoter,不存在的话通过protocol发布创建一个新的ExporterChangeableWrapper,debug可以发现这里的协议类型为
DubboProtocol.export(Invoker<T> invoker) 中主要做两件事:
1、将invoker转为expoter
2、启动netty
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();//参数invoker对象是由ProtocolFilterWrapper中生成的封装对象
// export service.
String key = serviceKey(url);//key = com.alibaba.dubbo.demo.DemoService:20880
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);//todo 是否提供stub存根
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);//todo 是否提供回声服务
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));//createServer通过url创建service并且放入到map缓存中
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
/**
* 启动通过serverKey对应名称的服务服务,默认为netty
* @param url
* @return
*/
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//默认心跳事件为true
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//获取服务名称(默认netty)
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);//requestHandler在RegistryProtocol创建时默认被创建的一个对象
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
/***netty开启*/
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);//getExchanger(url)通过url参数获取HeaderExchanger ,然后调用其bind方法
}
其中的ExchangeHandler 为:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {//后期通信调用
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
HeaderExchanger 中执行的步骤分为如下:
1、将dubbo中创建的ExchangeHandler进行包装转为HeaderExchangeHandler(HeaderExchangeHandler中重新封装ExchangeHandler中的连接、断开等方法)然后将HeaderExchangeHandler转为DecodeHandler (DecodeHandler提供封装的received方法,received中调用内部的编码方法)
2、调用Transporters的bind方法,改方法最终调用NettyTransporter创建一个NettyServer服务
3、返回新建的HeaderExchangeServer
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty4";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
netty的内容这里暂时不做详细解释,有兴趣的可以自己看看。
OK,执行完毕后我们返回RegistryProtocol的export方法doLocalExport(originInvoker)执行完成后会执行final Registry registry = getRegistry(originInvoker);进行zkClient的创建,执行代码如下:
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
/**
* 因url中的注册中心参数为zookeeper故该出调用的实际是ZookeeperRegistryFactory
*/
return registryFactory.getRegistry(registryUrl);
}
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();//key = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService(基于demo)
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);//启动zookeeper
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
完成后调用register(registryUrl, registedProviderUrl)将url注册到zookeeper。最终代码返回一个new Exporter对象。
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
网友评论