dubbo的SPI常用方法
//获取SPI自适应拓展点,具体哪个实现,在运行时决定
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
//获取SPI自动激活拓展点,具体哪个实现,由key决定,一般用于返回list
ExtensionLoader.getActivateExtension(url,key)
//获取SPI指定拓展点实现,初始化就知道是哪个实现了
ExtensionLoader.getExtension(name)
在开始源码前,我们需要先看ExtensionLoader是如何初始化的,一个Loader应该包含哪些东西。下面看下:getExtensionLoader(Interface.class)的源码:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
初始化ExtensionLoader比较简单,就是接口class赋值给成员变量type。加载接口的SPI文件,其实是通过懒加载的方式,第一次调用SPI的getExtension才会读取SPI配置文件。下面进行getExtension方法介绍。
1.1 dubbo的SPI常用方法---getExtension
开始看源码,先从最简单的getExtension开始:
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
这个时候如果是第一次加载,那么所有缓存都为null,一定会走到核心方法createExtension里面。这里用到了双重锁检测,防止并发,保证了instance的单例。
下面看createExtension方法:
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
方法里面第一行:getExtensionClasses,就是用来加载SPI配置文件的(需要是第一次调用)。然后看下这个方法:
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
第一次调用,cache全部都为空。一定会走到:loadExtensionClasses方法。
// 此方法已经getExtensionClasses方法同步过。
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if (value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
可以看到loadFile方法,把文件路径:META-INF/dubbo/*,META-INF/services/等下面的所有interface的SPI配置,全部加载到extensionClasses这个Map里面。
具体加载过程,无非就是读取文件内容(按行读取),然后解析行,获取key-value对,得到所有key对应的自己实现类。比如:com.alibaba.dubbo.rpc.Protocol接口SPI配置如下:
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
指定key=dubbo的实现类为:com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol。感兴趣的可以自己看下:loadFile方法。我们现在进去瞅一眼loadFile:
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line = null;
while ((line = reader.readLine()) != null) {
//#表示注释
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader);
if (! type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
if(cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (! cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
} else {
try {
//这里很关键,获取clazz参数是type类型的构造方法,放到cachedWrapperClasses,后面在实例化的时候会用到。比如:DubboProtocol会默认被ProtocolFilterWrapper和ProtocolListenerWrapper装饰。
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
这里除了解析META-INF/dubbo.internal的配置文件外,还有个重点就是根据特殊构造函数是否存在,向cachedWrapperClasses放入wapper类。我们继续查看createExtension的代码:
private T createExtension(String name) {
//如果name=dubbo,clazz就是com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
//调用DubboProtocol无参构造器,实例化DubboProtocol
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
//关键的来了,这里就使用了上面说的cachedWrapperClasses。被装饰的实例就是上面DubboProtocol的instance.
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
然后看到injectExtension方法。该方法用来注入SPI拓展类。源码如下:
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
可以看到,遍历所有set开头的方法,找到SPI拓展点依赖的成员变量,是否也是SPI拓展点。如果依赖了拓展类,那么就通过objectFactory.getExtension方法加载它。一定在回到getExtension方法,再次调用递归到这里,直到没有依赖拓展点。这里有个疑问:会不会出现拓展点之间互相依赖呢?类似Spring bean的循环依赖。
injectExtension注入完毕后,dubbo对所有的拓展点,自动执行wapper类封装(前提是有wapper类)。封装代码如下:
。。。。
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
上面代码把instance给到wapper的特殊构造函数,进行装饰。
cachedWrapperClasses是初始化SPI配置文件的时候,解析行的实现类,发现的所有的wapper类,会统一放在cachedWrapperClasses的Set中。然后在这里使用它们。dubbo会默认把普通SPI的实现类,封装在SPI的wapper类。比如:上面的Protocol的SPI配置,会把所有普通实现类,例如:dubbo,register,injvm等等,全部会封装到ProtocolListenerWrapper和ProtocolFilterWrapper类,会封装两层。相当于Spring的AOP,只要定义了wapper,并在配置文件中配置了,dubbo会默认把普通实现类封装到你指定的wapper类中。比如:所有普通实现类的通用逻辑,例如:打印日志,权限认证等等AOP的特性,都会在wapper中进行实现。
到这里通过getExtension方法,获取到了静态SPI拓展点的实现。但是,dubbo大量使用的动态拓展点实现,需要在运行时刻决定使用哪个拓展点实现。比如:protocol拓展点,dubbo在初始化的时候,只会实现一个自适应拓展点,然后在调用的时候,通过url指定的protocol=xxx,来决定调用哪个。具体在下一节介绍。
1.2 dubbo的SPI常用方法---getExtension
从源码开始:
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
如果是第一次调用,所有的adaptive的cache全部为空,肯定会走到createAdaptiveExtension方法。源码如下:
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
injectExtension方法就不介绍了,上面已经介绍过了,用来注入依赖的其他拓展点。看下getAdaptiveExtensionClass方法:
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
该方法没啥,看下createAdaptiveExtensionClass方法:
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
该方法也没啥,但是要注意两个点:createAdaptiveExtensionClassCode生成的自适应拓展点的代理实现java源码,要通过下面的compiler.compile编译并加载进JVM里面,能够被程序所用。下面我们重点介绍:createAdaptiveExtensionClassCode方法。至于compile方法,大家可以参考:JavassistCompiler的doCompile。这里不介绍了。
private String createAdaptiveExtensionClassCode() {
StringBuilder codeBuidler = new StringBuilder();
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for (Method m : methods) {
if (m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// 完全没有Adaptive方法,则不需要生成Adaptive类
if (!hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
codeBuidler.append("package " + type.getPackage().getName() + ";");
codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adaptive" + " implements " + type.getCanonicalName() + " {");
for (Method method : methods) {
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
Class<?>[] ets = method.getExceptionTypes();
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
} else {
int urlTypeIndex = -1;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
// 有类型为URL的参数
if (urlTypeIndex != -1) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);
s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
// 参数没有URL类型
else {
String attribMethod = null;
// 找到参数的URL属性
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) {
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
if (attribMethod == null) {
throw new IllegalStateException("fail to create adative class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}
// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);
s = String.format("%s url = arg%d.%s();", URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}
String[] value = adaptiveAnnotation.value();
// 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
if (value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if (Character.isUpperCase(charArray[i])) {
if (i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
} else {
sb.append(charArray[i]);
}
}
value = new String[]{sb.toString()};
}
boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
hasInvocation = true;
break;
}
}
String defaultExtName = cachedDefaultName;
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) {
if (i == value.length - 1) {
if (null != defaultExtName) {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
getNameCode = "url.getProtocol()";
}
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);
s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);
// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}
s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");
}
codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
for (int i = 0; i < pts.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
codeBuidler.append(" ");
codeBuidler.append("arg" + i);
}
codeBuidler.append(")");
if (ets.length > 0) {
codeBuidler.append(" throws ");
for (int i = 0; i < ets.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(ets[i].getCanonicalName());
}
}
codeBuidler.append(" {");
codeBuidler.append(code.toString());
codeBuidler.append("\n}");
}
codeBuidler.append("\n}");
if (logger.isDebugEnabled()) {
logger.debug(codeBuidler.toString());
}
return codeBuidler.toString();
}
该方法的目标:构造出SPI接口的自适应拓展实现。方法很复杂,但是都是为了生成正确的java源码来的。下面以Protocol的接口为例,生成的自适应拓展点实现如下:
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
+ url.toString()
+ ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(
com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
可以观察下这个Javassist代理生成的java源码,类名:Protocol$Adaptive,实现了Protocol接口。里面只有两个方法有实现,其他两个不实现。以refer为例:有个很核心的代码,String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()),用来获取运行时刻,调用Url中指定的getProtocol。然后通过protocol的文本,来调用getExtension方法。就回到了上面的最简单的获取SPI实现的方法了。
总结:
为什么要有自适应拓展类?
因为:想动态拓展SPI,但是java原生的SPI只支持静态。所以dubbo为了让编译器通过编译,必须对接口先行实现,然后实例化这个很薄的实现类。这个实现类会在refer或者export方法调用的时候,通过URL的上下文,运行时刻获取具体的SPI实现,做到真正的灵活。厉害。。。
1.2 dubbo的SPI常用方法---getActivateExtension
该方法就是获取@Activate注解的拓展点,源码:
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}
很多基础方法,上面已经解释过了,不再赘述。该方法的目标是:获取cachedActivates指定的name和activate,进行group过滤,values过滤等等。然后获取getExtension得到拓展实现类。
二、ExtensionLoader的Dubbo实例,里面有个cachedWrapperClasses成员变量。这个成员变量是怎么初始化的呢?
这个成员变量所属对象是ExtensionLoader。那么看下它是如何初始化就行了。看它的构造函数
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
所以,它只能由ExtensionLoader.getExtensionLoader(Protocol)来初始化。我们验证下:
public static void main(String[] args){
Protocol obj = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
System.out.println(obj);
}
发现obj确实是:ProtocolFilterWrapper类型的。那么我一个dubbo协议的Protocol,怎么就返回的是ProtocolFilterWrapper类型呢?
我们debug来一步一步追踪发现,在com.alibaba.dubbo.common.extension.ExtensionLoader#loadFile,有段代码:
try {
clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) {
clazz.getConstructor();
xxxxx
发现这里clazz就是执行readLine,得到protocol文件的一行数据。然后解析这个类,发现如果这个类的构造函数包含type(就是我们传入的dubbo协议)。所以我们去查看com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper类的构造函数,确认下是否有Protocol类的构造函数。其构造函数如下:
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
毫无疑问,它的构造函数包含了protocol。所以,它就执行了cachedWrapperClasses.add(clazz)操作。到这里,我们终于知道了为啥DubboProtocol类型外面永远被ProtocolFilterWrapper包装了。同理,readLine的下一行是ProtocolListenerWrapper,那么可想而知它也会被加载到cachedWrapperClasses。
好了,既然cachedWrapperClasses得到了,那么getExtension("dubbo")是如何被包装的呢?
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
里面有个对wrapperClasses 列表的for循环操作。列表第一位是ProtocolFilterWrapper,所以它先执行这个for循环。原始的instance一定是:DubboProtocol。然后执行了:
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
想当与执行了:ProtocolFilterWrapper.newIntrance(DubboProtocol),然后赋值给instance。执行第二个for数据,ProtocolListenerWrapper类。同样相当于了执行了:
ProtocolListenerWrapper.newIntrance(ProtocolFilterWrapper)。这样就达到了一层一层封装的目的了。
里面还有一个小细节:protocol文件的顺序很关键,不能随意调换。因为它决定了封装层级,以及封装顺序。
RegistryProtocol同DubboProtocol。
总结:在dubbo中url中的协议dubbo,通过SPI得到最终对象,其实已经是封装好的对象。
网友评论