本节分析 Dubbo SPI 基础源码。
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Protocol;
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol dubboProtocol = loader.getExtension("dubbo");
Protocol 接口的定义
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
注意这里的两个核心注解 @SPI
和 @Adaptive
@SPI
:指定一个接口为SPI接口(可扩展接口)
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
/**
* 缺省扩展点名
* default extension name
*/
String value() default "";
}
@Adaptive
:该注解可以注解在两个地方:
接口上
:在 Dubbo 中,仅有AdaptiveExtensionFactory
和AdaptiveCompiler
;接口的方法上
:会动态生成相应的动态类(实际上是一个工厂类,工厂设计模式),例如Protocol$Adapter
Dubbo 的整个加载 SPI 扩展流程都是在 ExtensionLoader
中完成的。
一、获取 ExtensionLoader
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
ExtensionLoader
可以类比为 JDK SPI 中的ServiceLoader
。
首先来看一下 ExtensionLoader
的类属性:
/**
* 类变量(全局变量),也可以做成一个 ExtensionLoaderFactory 来专门存储这些类变量
*/
/** 存放SPI文件的三个目录 */
/** 1. META-INF/services/ 是jdk的SPI文件的存放目录 */
private static final String SERVICES_DIRECTORY = "META-INF/services/";
/** 2. META-INF/dubbo/ 是第三方SPI文件的存放目录 */
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
/** 3. META-INF/dubbo/internal/ 是Dubbo内部SPI文件的存放目录 */
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
/** key: SPI接口Class value: 该接口的ExtensionLoader, eg. "interface com.alibaba.dubbo.rpc.Protocol" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]" */
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
/** key: SPI接口Class value: SPI实现类的对象实例 */
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
注意
:上述的都是类属性,即所有该类的实例都共享。在 SOFARPC 中,是有一个 ExtensionLoaderFactory
类来存储 Map<Class, ExtensionLoader>
变量的。
再来看一下 ExtensionLoader
的实例属性:
/**
* 实例变量
*/
/** SPI接口Class,eg. Protocol */
private final Class<?> type;
/** 用于 Dubbo IOC 注入实例的创建 */
private final ExtensionFactory objectFactory;
/** key: ExtensionClass的Class value: SPI实现类的key */
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
/** 存储 type 的所有普通实现类的 spi key-valueClass: Map<String, Class<?>> getExtensionClasses() */
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
/** 如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spi key,value=注解 */
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
/** 缓存创建好的extensionClass实例 */
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
/** 缓存创建好的适配类实例 */
private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
/** 存储类上带有 @Adaptive 注解的Class,如果没有,则自动生成一个适配类 */
private volatile Class<?> cachedAdaptiveClass = null;
/** 如果扩展接口(eg. Protocol)有 @SPI 注解,则获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中 */
private String cachedDefaultName;
/** 存储在创建适配类实例这个过程中发生的错误,后续该 type 下再调用创建适配类的时候,就直接抛错 */
private volatile Throwable createAdaptiveInstanceError;
/** 存放具有一个type入参的构造器的实现类(wrapper类)的Class对象,用于实现 AOP */
private Set<Class<?>> cachedWrapperClasses;
/** key :实现类的全类名 value: exception, 如果加载当前行失败,则存储在这里,防止真正的异常被吞掉 */
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();
来看一下 getExtensionLoader(Class<T> type)
的源码:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
/**
* 1. 校验入参type:非空 + 接口 + 含有@SPI注解
*/
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
/**
* 2. 根据type接口从全局缓存EXTENSION_LOADERS中获取ExtensionLoader,如果有直接返回;如果没有,则先创建,之后放入缓存,最后返回
*/
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
创建 ExtensionLoader
:
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
当前创建的 ExtensionLoader
对象的 type 是 com.alibaba.dubbo.rpc.Protocol
,所以此时会执行:objectFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
。(实际上,除了 ExtensionFactory 本身之外,其他所有的 SPI 接口的 ExtensionLoader
对象都有一个 ExtensionFactory
实例 objectFactory
,该实例主要用于实现 Dubbo IOC 注入实例的创建)该行代码后续在进行分析。我们目前只需要知道 ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(Class<T> type)
最终得到的实例变量是:
Class<?> type = interface T
ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类)
- factories = [SpiExtensionFactory实例, SpringExtensionFactory实例]
二、加载指定 SPI 实现类
Protocol dubboProtocol = loader.getExtension("dubbo")
调用层级:
ExtensionLoader<T>.getExtension()
-- createExtension(String name)
// 获取扩展类
-- getExtensionClasses().get(name)
-- loadExtensionClasses()
// 加载一个目录
-- loadDirectory(Map<String, Class<?>> extensionClasses, String dir)
// 加载目录下的一个文件
-- loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL)
// 加载文件中的一个SPI实现类
-- loadClass(Map<String, Class<?>> extensionClasses, Class<?> clazz, String name)
// Dubbo IOC
-- injectExtension(instance);
// Dubbo AOP
-- wrapper包装;
本小节只分析 getExtensionClasses()
;injectExtension(instance)
和 wrapper包装
放在 IOC 和 AOP 部分进行分析。
/**
* 根据传入的spiKey(name,eg.dubbo),创建相应的SPI扩展实例(eg.DubboProtocol),
* 并且将 <spiKey, 创建好的SPI扩展实例> 存储到 Map<String, Holder<Object>> cachedInstances 中,
* eg. <"dubbo", DubboProtocol>
*/
@SuppressWarnings("unchecked")
public T getExtension(String name) {
...
// 创建 @SPI.value 的 extension 的实例
if ("true".equals(name)) {
return getDefaultExtension();
}
// 首先从缓存 Map<String, Holder<Object>> cachedInstances 中获取,如果存在,则直接返回,如果不存在,则双重检测创建实例
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建SPI实现类实例
instance = createExtension(name);
// 进行缓存
holder.set(instance);
}
}
}
return (T) instance;
}
/**
* 调用方已经加了同步锁,无需考虑线程同步问题
*/
private T createExtension(String name) {
// 从普通SPI实现类map中获取传入的spiKey的实现类,例如 DubboProtocol.class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
// 如果没找到,抛出加载SPI实现类时出现的异常
throw findException(name);
}
try {
// 从缓存中获取SPI实现类实例,如果有,直接返回,如果没有,创建
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 创建 DubboProtocol 实例(这里可以看出,要求SPI实现类必须具有无参构造器),放到缓存中 Map<Class<?>, Object> EXTENSION_INSTANCES
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// Dubbo IOC
injectExtension(instance);
// Dubbo AOP
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw ...;
}
}
/**
* 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
* 1. Holder<Map<String, Class<?>>> cachedClasses,存储 type 的所有普通实现类的 <spiKey, valueClass>
* 2. 如果扩展接口(eg. Protocol)有 @SPI 注解,则获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中(例如, "dubbo")
* 3. 之后依次加载三个 spi 目录下的所有 type spi 文件,读取文件中的每一行:
* 3.1 如果类上有 @Adaptive 注解,则缓存到 Class<?> cachedAdaptiveClass 中
* 3.2 如果类是 wrapper 类,则缓存到 Set<Class<?>> cachedWrapperClasses 中
* 3.3 如果类是普通实现类,
* 检测实现类必须有无参构造器,(在 T createExtension(String name) 处会调用无参构造器实例化)
* 如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spiKey,value=注解;
* 存储 spiKey 到 Map<Class<?>, String> cachedNames;
* 将当前的 spi 实现类的 <spiKey, value> 存储到 Holder<Map<String, Class<?>>> cachedClasses
* 3.4 如果加载当前行失败,则存储在 Map<String, IllegalStateException> exceptions 中,key :实现类的全类名 value: exception,防止真正的异常被吞掉
*/
private Map<String, Class<?>> getExtensionClasses() {
// 从缓存中获取 SPI 实现类集合,如果有,直接返回,如果没有,双重检测进行加载
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
// synchronized in getExtensionClasses
private Map<String, Class<?>> loadExtensionClasses() {
// 获取扩展接口(eg. Protocol)的 @SPI 注解(SPI接口必须有该注解),获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
...
cachedDefaultName = value;
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
/**
* 加载指定目录dir下的SPI实现
*/
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName();
try {
// 同一个名称的文件可能存在于多个 jar 包中
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
...
if (urls != null) {
while (urls.hasMoreElements()) {
// 每一个 url 是一个文件
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error(...);
}
}
/**
* 加载一个文件
*/
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
...
// 每一行是一个 spi 扩展实现
String line;
// 读取每一行
while ((line = reader.readLine()) != null) {
// 去除注释 key=value #xxx
...
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
// 获取 spi key
name = line.substring(0, i).trim();
// 获取 spi 实现类
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
// 加载一个类
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
// 加载失败,存储失败信息
exceptions.put(line, e);
}
}
...
} catch (Throwable t) {
logger.error(...);
}
}
/**
* 加载一个类
*/
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
// adaptive 类、wrapper 类、普通 SPI 类都必须实现 SPI 接口
if (!type.isAssignableFrom(clazz)) {
throw ...
}
// 如果类上有 @Adaptive 注解,则缓存到 Class<?> cachedAdaptiveClass 中(如果有多个 @Adaptive 类,则直接抛出异常)
if (clazz.isAnnotationPresent(Adaptive.class)) {
if (cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (!cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}
// 如果类是 wrapper 类,则缓存到 Set<Class<?>> cachedWrapperClasses 中
} else if (isWrapperClass(clazz)) {
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
// 正常类
} else {
// 检测必须有无参构造器
clazz.getConstructor();
...
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
// 如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spiKey,value=注解
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
// 存储 spiKey 到 Map<Class<?>, String> cachedNames
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
// 将 spi 的 key-value 存储到 Map<String, Class<?>> extensionClasses 中,
// 实际上该 extensionClasses 会在最外层 getExtensionClasses() 处存储到 Holder<Map<String, Class<?>>> cachedClasses 中
...
extensionClasses.put(n, clazz);
...
}
}
}
}
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
到这里,getExtensionClasses()
就分析完成了,来看下此时的 ExtensionLoader
对象:
- static Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS
-- "interface com.alibaba.dubbo.rpc.Protocol" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]"
-- "interface com.alibaba.dubbo.common.extension.ExtensionFactory" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.common.extension.ExtensionFactory]"
- static Map<Class<?>, Object> EXTENSION_INSTANCES
-- "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol" -> DubboProtocol 实例
-- "class com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory" -> SpiExtensionFactory 实例
-- "class com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory" -> SpringExtensionFactory 实例
- Class<?> type = interface com.alibaba.dubbo.rpc.Protocol;
- ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类);
-- factories = [SpiExtensionFactory实例, SpringExtensionFactory实例]
- Map<Class<?>, String> cachedNames
-- "class com.alibaba.dubbo.registry.integration.RegistryProtocol" -> "registry"
-- "class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol" -> "injvm"
-- "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol" -> "dubbo"
-- "class com.alibaba.dubbo.rpc.support.MockProtocol" -> "mock"
- Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
-- "registry" -> "class com.alibaba.dubbo.registry.integration.RegistryProtocol"
-- "injvm" -> "class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol"
-- "dubbo" -> "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol"
-- "mock" -> "class com.alibaba.dubbo.rpc.support.MockProtocol"
- Map<String, Activate> cachedActivates
- Map<String, Holder<Object>> cachedInstances
-- "dubbo" ->
- Holder<Object> cachedAdaptiveInstance
- Class<?> cachedAdaptiveClass = null;
- String cachedDefaultName = 'dubbo';
- Throwable createAdaptiveInstanceError;
- Set<Class<?>> cachedWrapperClasses;
-- class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
-- class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
- Map<String, IllegalStateException> exceptions
三、加载或创建适配类
// 1. 手动适配类,类上带有注解 @Adaptive
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
// 2. 自动生成适配类,方法上带有注解 @Adaptive
Protocol adaptiveExtension = loader.getAdaptiveExtension()
3.1、手动创建适配类
Dubbo 中手动创建的适配类只有
ExtensionFactory
和Compiler
,以ExtensionFactory
为例,
image.png
@SPI
public interface ExtensionFactory {
/**
* @param type object type. SPI 类型
* @param name object name. setXxx 中的 xxx
* @return object instance.
*/
<T> T getExtension(Class<T> type, String name);
}
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
/**
* factories = [SpiExtensionFactory, SpringExtensionFactory]
* 也就是说 dubbo 的 ioc 可以注入 "spi适配类" 和 "spring bean"
*/
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
从上一小节中 SPI 实现类的加载过程来看,在加载 ExtensionFactory
的过程中,会把 AdaptiveExtensionFactory
缓存到 ExtensionFactory
的 ExtensionLoader
中的 Class<?> cachedAdaptiveClass
属性上。
public T getAdaptiveExtension() {
// 如果创建好的适配器实例存在,则直接返回,否则,双重检测进行创建单例 cachedAdaptiveInstance
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建 适配类 实例
instance = createAdaptiveExtension();
// 将适配类实例缓存到 Holder<Object> cachedAdaptiveInstance 中
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
// 如果创建适配类失败,存储到 Throwable createAdaptiveInstanceError 缓存中
createAdaptiveInstanceError = t;
throw new IllegalStateException(...);
}
}
}
} else {
throw new IllegalStateException(...);
}
}
return (T) instance;
}
/**
* createAdaptiveExtension()
* --getAdaptiveExtensionClass()
* //从dubbo-spi配置文件中获取AdaptiveExtensionClass
* --getExtensionClasses()
* --loadExtensionClasses()
* --loadFile(Map<String, Class<?>> extensionClasses, String dir)
* //创建适配类
* --createAdaptiveExtensionClass()
*
* --injectExtension(T instance) // dubbo ioc
*/
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
// 1. 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
getExtensionClasses();
// 2. 如果存在类上具有 @@Adaptive 注解的Class,则直接返回,否则创建一个 AdaptiveExtensionClass
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
此处因为
cachedAdaptiveClass = AdaptiveExtensionFactory
,所以直接返回。如果没有适配类,那么就走createAdaptiveExtensionClass()
方法(eg.Protocol
)。
3.2、自动生成适配类
private Class<?> createAdaptiveExtensionClass() {
// 1. 拼接 AdaptiveExtensionClass 类代码
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
// 2. 获取编译器
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 3. 使用编译器编译 AdaptiveExtensionClass 类代码
return compiler.compile(code, classLoader);
}
对于 Protocol
接口而言,拼接好的代码如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
/**
* 工厂类和代理类的结合体
*/
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
@Override
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
@Override
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
@Override
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}
// 1. 从 URL 中获取 spiKey
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
}
// 2. 根据 spiKey 获取相应的实现类
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
// 3. 调用实现类的方法
return extension.refer(arg0, arg1);
}
@Override
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (arg0.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
对于方法上有 @Adaptive 注解的方法,生成相应的实现类(该实现是一个工厂 + 代理的结合体),对于方法上没有 @Adaptive 注解的方法,直接抛出异常。拼接好之后,获取编译器,调用编译器将 String code 编译为 Class。(关于编译器,后续进行分析,关于拼接源码分析,见 http://dubbo.apache.org/zh-cn/docs/source_code_guide/adaptive-extension.html)
四、Dubbo IOC
/**
* ioc 有效的前提: objectFactory != null
* 遍历当前的实现类(eg. DubboProtocol)中的每一个方法:
* 1. 对于方法是 setXxx + public修饰符 + setXxx入参只有一个的方法,进行如下操作:
* 1.1 如果该 setXxx 方法有 @DisableInject 注解,则当前方法不处理,行下一个方法的处理,否则
* 1.2 获取 setXxx 入参类型 pt + 获取入参属性名 property,
* 之后使用 AdaptiveExtensionFactory.getExtension(pt, property) 来获取被注入的实例
* 1.3 调用 setXxx 将被注入的实例设置到当前的实现类中,实现 ioc
*
*/
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) {
// 如果该 setXxx 方法有 @DisableInject 注解,则当前方法不处理,行下一个方法的处理
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
// 获取setXxx入参类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取入参属性名 setName => name
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
// 使用 AdaptiveExtensionFactory.getExtension(pt, property) 来获取被注入的实例
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 调用 setXxx 将被注入的实例设置到当前的实现类中,实现 ioc
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error(...);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
从上述的分析来看,除了 ExtensionFactory
本身之外,其他 SPI 接口的 ExtensionFactory objectFactory = AdaptiveExtensionFactory
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
/**
* factories = [SpiExtensionFactory, SpringExtensionFactory]
* 也就是说 dubbo 的 ioc 可以注入 "spi适配类" 和 "spring bean"
*/
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
public class SpiExtensionFactory implements ExtensionFactory {
/**
* 注意:因为这里只是返回适配类,所以 name 属性是没用的
*/
@Override
public <T> T getExtension(Class<T> type, String name) {
// 如果type是接口并且有@SPI注解 && type的spi实现不为空,则返回 type 的适配实现类(一个spi接口至少有一个普通实现类,适配类起着工厂和代理的作用)
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (!loader.getSupportedExtensions().isEmpty()) {
return loader.getAdaptiveExtension();
}
}
return null;
}
}
public class SpringExtensionFactory implements ExtensionFactory {
private static final Set<ApplicationContext> contexts = new ConcurrentHashSet<ApplicationContext>();
private static final ApplicationListener shutdownHookListener = new ShutdownHookListener();
/**
* 调用点:
* 1. ServiceBean<T> implements ApplicationContextAware
* 2. ReferenceBean<T> implements ApplicationContextAware
* 3. 手动调用
*/
public static void addApplicationContext(ApplicationContext context) {
contexts.add(context);
BeanFactoryUtils.addApplicationListener(context, shutdownHookListener);
}
...
/**
* 1. 首先按 spring bean 名称查找:遍历 Set<ApplicationContext>,如果包含 name 的 bean,则直接获取该 bean,判断该 bean 属于 type,则直接返回;否则,
* 2. 上述没有查找到,则直接遍历 Set<ApplicationContext>,调用 ctx.getBean(type) 根据 type 来创建 bean(如果同时有多个实现类或者一个也没有,那么根据 type 获取直接抛出)
*/
@Override
public <T> T getExtension(Class<T> type, String name) {
// 1. 首先按 spring bean 名称查找
for (ApplicationContext context : contexts) {
if (context.containsBean(name)) {
Object bean = context.getBean(name);
if (type.isInstance(bean)) {
return (T) bean;
}
}
}
// 2. 按 spring bean type 查找
for (ApplicationContext context : contexts) {
try {
return context.getBean(type);
} catch (NoUniqueBeanDefinitionException multiBeanExe) {
// 有多个实现,报错
...
} catch (NoSuchBeanDefinitionException noBeanExe) {
// 没有实现,报错
...
}
}
return null;
}
/**
* 关闭钩子
**/
private static class ShutdownHookListener implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextClosedEvent) {
// we call it anyway since dubbo shutdown hook make sure its destroyAll() is re-entrant.
// pls. note we should not remove dubbo shutdown hook when spring framework is present, this is because
// its shutdown hook may not be installed.
DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();
shutdownHook.destroyAll();
}
}
}
}
五、Dubbo AOP
/**
* 调用方已经加了同步锁,无需考虑线程同步问题
*/
private T createExtension(String name) {
// 从普通SPI实现类map中获取传入的spiKey的实现类,例如 DubboProtocol.class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
// 如果没找到,抛出加载SPI实现类时出现的异常
throw findException(name);
}
try {
// 从缓存中获取SPI实现类实例,如果有,直接返回,如果没有,创建
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 创建 DubboProtocol 实例(这里可以看出,要求SPI实现类必须具有无参构造器),放到缓存中 Map<Class<?>, Object> EXTENSION_INSTANCES
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// Dubbo IOC
injectExtension(instance);
// Dubbo AOP
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw ...;
}
}
来看 AOP 的这段源码:
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
在第二小节分析《加载 SPI 实现》的过程中,会把指定的 SPI 接口的 Wrapper 类(实现了 SPI 接口 + 入参仅有一个SPI接口参数)放到 Set<Class<?>> cachedWrapperClasses
中,之后遍历 cachedWrapperClasses
循环调用 Wrapper 的单参构造器创建实例,形成一条 Wrapper 链,链尾是进行了 IOC 操作的最底层的 SPI 实现类
六、@Activate 条件注解
@Activate
类似于 spring@Conditional
注解,可根据一定的条件(group
- 比如 provider、value
- spiKey,比如 dubbo)自动加载多个 SPI 实现类,并且实现了这些 SPI 实现类加载的先后顺序(绝对排序 -order
和相对排序 -before/after
), 下面我将 第3章 Dubbo SPI 使用姿势 分析的@Activate
和激活点加载流程
再贴一遍,非常重要
@Activate
注解:
String[] group() default {}
:
- 如果
getActivateExtension
接口传入的 group 参数为 null 或者 length==0,表示不限制 group,则允许加载当前 SPI 实现;- 查看当前的 SPI 实现的
@Activate
注解中的参数 groups 是否包含传入的限制参数 group,如果包含,则允许加载当前的 SPI 实现。
String[] value() default {}
:
- 如果当前的 SPI 实现的
@Activate
注解没有 value() 属性,则认为默认是允许当前的 SPI 实现加载的;- 如果 value() 中的任一值出现在当前传入的
URL#getParameters()
中的一个参数名,则认为默认是允许当前的 SPI 实现加载的;
@Activate
/**
* 激活点。该注解用于根据已有的条件来自动激活满足条件的多个扩展实现类。
* eg. @Activate 可以被用于加载多个 Filter 扩展实现类。
* SPI 服务发现者可以通过调用 {@link ExtensionLoader#getActivateExtension(URL, String, String)} 来找出所有满足给定条件的的激活扩展实现类。
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/* 当 String[] group() 包含当前传入的 group,则该条件满足,进行其他条件的判断 */
String[] group() default {};
/* 当 String[] value() 中的任一值出现在当前传入的 URL#parameters() 中的一个参数名 */
String[] value() default {};
/* String[] before() 指定当前的 SPI 扩展实现类需要放在哪些扩展实现类之前 */
String[] before() default {};
/* String[] after() 指定当前的 SPI 扩展实现类需要放在哪些扩展实现类之后 */
String[] after() default {};
/* 指定绝对的顺序 order,order 越小,越靠前 */
int order() default 0;
}
这里尤其注意
String[] value()
的含义,可结合 第3章 Dubbo SPI 使用姿势 中@Activate
的例子进行理解。
条件激活点加载原理
激活点加载流程
(仅列出最常用的主线,其他支线见后续的源码分析及源码注释):
- 首先获取除了传入的 spiKey 集合(values)指定的 spi 激活点实现类(称为 default 激活点),之后对 default 激活点进行排序
加载 default 激活点的规则:
- 如果
getActivateExtension
接口传入的 group 参数为 null 或者 length==0,表示不限制 group,则允许加载当前 SPI 实现;- 如果 group 有效,则查看当前的 SPI 实现的
@Activate
注解中的参数 groups 是否包含传入的限制参数 group,如果包含,则允许加载当前的 SPI 实现;- 传入的spiKey 集合(values)不包含(-name,name 表示当前处理到的 SPI 激活点的 spiKey):也就是说配置 -name 可以排除掉某个实现类;
- 如果当前的 SPI 实现的
@Activate
注解没有 value() 属性,则认为默认是加载的,直接返回 true;- 如果当前的 SPI 实现的 @Activate 注解有 value() 属性,遍历每一个元素,如果
url.getParameters()
中的参数名包含了其中任意一个元素(也就是说String[] value()
中的任一值出现在当前传入的URL#parameters()
中的一个参数名)
- 之后获取传入的 spiKey 集合(values)指定的 SPI 激活点实现类(称为 usr 激活点)
- 传入的spiKey 集合(values)不包含(-name,name 表示当前处理到的 SPI 激活点的 spiKey):也就是说配置 -name 可以排除掉某个实现类;
- 将 default 激活点集合和 usr 激活点集合放到一个集合中,default 在前,usr 在后
public List<T> getActivateExtension(URL url, String[] values, String group) {
// 只加载 names 之外的 spi 实现(或者称为 default 实现) + 最后将 names 指定的 spi 实现集合(usrs)也加入到了 exts 中
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 如果不包含"-default",
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
// 1. 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey(); // spiKey
Activate activate = entry.getValue(); // 对应的注解
/**
* 匹配 group,两条规则:|| 关系
* 1. 如果接口传入的 group 参数为 null 或者 length==0,表示不限制 group
* 2. 查看当前的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group
*/
if (isMatchGroup(group, activate.group())) {
// 如果 group 匹配,则创建相应的 SPI 实现类
T ext = getExtension(name);
/**
* 匹配 value,三条规则:&& 关系
* 1. 如果传入的 spi key 集合(names)不包含当前遍历的 spi 实现(spi-key=name):也就是说这个 for 循环只加载 names 之外的 spi 实现,names 指定的在 usrs 中加载
* 2. 传入的 spi key 集合(names)不包含(-name):也就是说配置 -name 可以排除掉某个实现类
* 3. 判断是否激活当前的 spi:|| 关系
* - 如果 @Activate 注解没有 value() 属性,则认为默认是加载的,直接返回 true
* - 如果 @Activate 注解有 value() 属性,遍历每一个元素,
* 如果 (url.getParameters() 中的参数名包含了其中任意一个元素 || url.getParameters() 中的参数名是以 ".任一元素" 结尾)
* && url 中的对应的参数名的参数值不为空
*/
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
// 添加到附加列表中
exts.add(ext);
}
}
}
// 对默认 spi 激活扩展进行排序
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 只加载 names 指定的 spi 实现
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
// 如果传入的是 name 以 - 开头 || 传入的 names 包含 -name,则不加载该激活点
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
// 如果传入的 name 的名字是 default,则将之前所加载的 usrs 扩展点放到 exts 的最前边;
// 否则,创建扩展点,将扩展点加入到 usrs 中
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
// 由于使用了 List,所以 usrs 放在了 exts 之后
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
/**
* 查看当前的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group
* @param group 传入的限制参数
* @param groups @Activate 注解中的参数
*/
private boolean isMatchGroup(String group, String[] groups) {
// 如果传入的 group 参数为 null 或者 length==0,表示不限制 group
if (group == null || group.length() == 0) {
return true;
}
if (groups != null && groups.length > 0) {
for (String g : groups) {
if (group.equals(g)) {
return true;
}
}
}
return false;
}
private boolean isActive(Activate activate, URL url) {
// 如果 @Activate 注解没有 value() 属性,则认为默认是加载的,直接返回 true
String[] keys = activate.value();
if (keys.length == 0) {
return true;
}
// 如果 @Activate 注解有 value() 属性,遍历每一个元素,
// 如果 (url.getParameters() 中的参数名包含了其中任意一个元素 || url.getParameters() 中的参数名是以 ".任一元素" 结尾)
// && url 中的对应的参数名的参数值不为空
for (String key : keys) {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
if ((k.equals(key) || k.endsWith("." + key)) // url.getParameters() 中包含了其中任意一个元素
&& ConfigUtils.isNotEmpty(v)) {
return true;
}
}
}
return false;
}
网友评论