Spring框架中也实现了类似EventBus中的事件监听方法。Spring中的很多功能都有赖于此实现,让我们一起来学习一下。
基于ApplicationEvent和ApplicationListener的实现
image.png如上图所示,Spring中的事件监听方法,两个重要类的继承关系。下面我们来基于这两个类实现一个简单的Demo。
/**
* 自定义事件实体类 必须继承ApplicationEvent类
*
* @author tomxin
* @since 2018-12-16
*/
public class DemoEvent extends ApplicationEvent {
private String message;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public DemoEvent(Object source) {
super(source);
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return this.message;
}
}
/**
* 事件监听者
*
* @author tomxin
* @since 2018-12-16
*/
public class DemoListener implements ApplicationListener<ApplicationEvent> {
private ReceiverService receiverService;
/**
* 构造函数
*/
public DemoListener() {
receiverService = new ReceiverService();
}
/**
* 实现ApplicationListener的onApplicationEvent方法
*
* @param event
*/
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof DemoEvent) {
receiverService.receive(((DemoEvent) event).getMessage());
} else {
System.out.println("event class " + event.getClass());
}
}
}
/**
* 事件接收,具体处理方法的类
*
* @author tomxin
* @since 2018-12-16
*/
public class ReceiverService {
/**
* 接收方法
*
* @param message
*/
public void receive(String message) {
System.out.println("receiver message" + message);
}
}
/**
* 消息发送实体类
*
* @author tomxin
* @since 2018-12-16
*/
public class PublishService {
/**
* 发送消息的方法
*/
public void sendMessage() {
// 创建一个发送信息的实体类
ApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
// 将Listener实例化,并加载到监听者列表中。
DemoListener demoListener = new DemoListener();
applicationEventMulticaster.addApplicationListener(demoListener);
// 承载消息的实体类,封装信息内容,消息实体类必须继承ApplicationEvent
DemoEvent event = new DemoEvent("tomxin");
event.setMessage("demo event message");
// 发送消息
applicationEventMulticaster.multicastEvent(event);
}
}
// main 方法
public class EventTest {
public static void main(String[] args) {
// 初始化发布者实体类
PublishService publishService = new PublishService();
// 发送消息
publishService.sendMessage();
}
}
上面代码中的实现方式,并没有与Spring容器有直接关系,只是演示这几个类之间的关系。在实际项目中,如果我们有使用到Spring的事件广播机制,不需要手动将监听者注册到广播器中,也不需要实例化广播器,Spring都会帮我们做好这些。同时需要注意的是在Spring项目中,继承ApplicationListener的监听者,会接收到所有Spring的事件通知,我们需要根据实际情况判断需要的事件类,来达到自己的目的。
ApplicationEventMulticatster源码分析
通过上面的代码大家也可以看到ApplicationEventMulticaster广播器,实际上起到了中介者的作用。它链接了事件的发布者与接收者使这两者解耦。下面我们一起看一下它内部的实现逻辑。
继承关系
image.png从图中可以看出ApplicationEventMulticaster主要几个方法,添加监听者,移除监听者,消息广播这三大类。
AbstractApplicationEventMulticaster源码分析
该类中主要实现了事件监听者的注册和移除。
public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);
@Nullable
private ClassLoader beanClassLoader;
@Nullable
private BeanFactory beanFactory;
private Object retrievalMutex = this.defaultRetriever;
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.beanClassLoader = classLoader;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableBeanFactory) {
ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
if (this.beanClassLoader == null) {
this.beanClassLoader = cbf.getBeanClassLoader();
}
this.retrievalMutex = cbf.getSingletonMutex();
}
}
private BeanFactory getBeanFactory() {
if (this.beanFactory == null) {
throw new IllegalStateException("ApplicationEventMulticaster cannot retrieve listener beans " +
"because it is not associated with a BeanFactory");
}
return this.beanFactory;
}
// 该方法为添加监听者。
@Override
public void addApplicationListener(ApplicationListener<?> listener) {
//
synchronized (this.retrievalMutex) {
// 判断该类是不是已经通过AOP被代理类封装过了,如果是的话,返回原始的类对象。
// Explicitly remove target for a proxy, if registered already,
// in order to avoid double invocations of the same listener.
Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
if (singletonTarget instanceof ApplicationListener) {
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear();
}
}
@Override
public void addApplicationListenerBean(String listenerBeanName) {
synchronized (this.retrievalMutex) {
this.defaultRetriever.applicationListenerBeans.add(listenerBeanName);
this.retrieverCache.clear();
}
}
@Override
public void removeApplicationListener(ApplicationListener<?> listener) {
synchronized (this.retrievalMutex) {
this.defaultRetriever.applicationListeners.remove(listener);
this.retrieverCache.clear();
}
}
@Override
public void removeApplicationListenerBean(String listenerBeanName) {
synchronized (this.retrievalMutex) {
this.defaultRetriever.applicationListenerBeans.remove(listenerBeanName);
this.retrieverCache.clear();
}
}
@Override
public void removeAllListeners() {
synchronized (this.retrievalMutex) {
this.defaultRetriever.applicationListeners.clear();
this.defaultRetriever.applicationListenerBeans.clear();
this.retrieverCache.clear();
}
}
/**
* Return a Collection containing all ApplicationListeners.
* @return a Collection of ApplicationListeners
* @see org.springframework.context.ApplicationListener
*/
protected Collection<ApplicationListener<?>> getApplicationListeners() {
synchronized (this.retrievalMutex) {
return this.defaultRetriever.getApplicationListeners();
}
}
/**
* Return a Collection of ApplicationListeners matching the given
* event type. Non-matching listeners get excluded early.
* @param event the event to be propagated. Allows for excluding
* non-matching listeners early, based on cached matching information.
* @param eventType the event type
* @return a Collection of ApplicationListeners
* @see org.springframework.context.ApplicationListener
*/
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
/**
* Actually retrieve the application listeners for the given event and source type.
* @param eventType the event type
* @param sourceType the event source type
* @param retriever the ListenerRetriever, if supposed to populate one (for caching purposes)
* @return the pre-filtered list of application listeners for the given event and source type
*/
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
List<ApplicationListener<?>> allListeners = new ArrayList<>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
if (!listenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
try {
Class<?> listenerType = beanFactory.getType(listenerBeanName);
if (listenerType == null || supportsEvent(listenerType, eventType)) {
ApplicationListener<?> listener =
beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListenerBeans.add(listenerBeanName);
}
allListeners.add(listener);
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
/**
* Filter a listener early through checking its generically declared event
* type before trying to instantiate it.
* <p>If this method returns {@code true} for a given listener as a first pass,
* the listener instance will get retrieved and fully evaluated through a
* {@link #supportsEvent(ApplicationListener,ResolvableType, Class)} call afterwards.
* @param listenerType the listener's type as determined by the BeanFactory
* @param eventType the event type to check
* @return whether the given listener should be included in the candidates
* for the given event type
*/
protected boolean supportsEvent(Class<?> listenerType, ResolvableType eventType) {
if (GenericApplicationListener.class.isAssignableFrom(listenerType) ||
SmartApplicationListener.class.isAssignableFrom(listenerType)) {
return true;
}
ResolvableType declaredEventType = GenericApplicationListenerAdapter.resolveDeclaredEventType(listenerType);
return (declaredEventType == null || declaredEventType.isAssignableFrom(eventType));
}
/**
* Determine whether the given listener supports the given event.
* <p>The default implementation detects the {@link SmartApplicationListener}
* and {@link GenericApplicationListener} interfaces. In case of a standard
* {@link ApplicationListener}, a {@link GenericApplicationListenerAdapter}
* will be used to introspect the generically declared type of the target listener.
* @param listener the target listener to check
* @param eventType the event type to check against
* @param sourceType the source type to check against
* @return whether the given listener should be included in the candidates
* for the given event type
*/
protected boolean supportsEvent(
ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
/**
* Cache key for ListenerRetrievers, based on event type and source type.
*/
private static final class ListenerCacheKey implements Comparable<ListenerCacheKey> {
private final ResolvableType eventType;
@Nullable
private final Class<?> sourceType;
public ListenerCacheKey(ResolvableType eventType, @Nullable Class<?> sourceType) {
Assert.notNull(eventType, "Event type must not be null");
this.eventType = eventType;
this.sourceType = sourceType;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
ListenerCacheKey otherKey = (ListenerCacheKey) other;
return (this.eventType.equals(otherKey.eventType) &&
ObjectUtils.nullSafeEquals(this.sourceType, otherKey.sourceType));
}
@Override
public int hashCode() {
return this.eventType.hashCode() * 29 + ObjectUtils.nullSafeHashCode(this.sourceType);
}
@Override
public String toString() {
return "ListenerCacheKey [eventType = " + this.eventType + ", sourceType = " + this.sourceType + "]";
}
@Override
public int compareTo(ListenerCacheKey other) {
int result = this.eventType.toString().compareTo(other.eventType.toString());
if (result == 0) {
if (this.sourceType == null) {
return (other.sourceType == null ? 0 : -1);
}
if (other.sourceType == null) {
return 1;
}
result = this.sourceType.getName().compareTo(other.sourceType.getName());
}
return result;
}
}
/**
* Helper class that encapsulates a specific set of target listeners,
* allowing for efficient retrieval of pre-filtered listeners.
* <p>An instance of this helper gets cached per event type and source type.
*/
private class ListenerRetriever {
public final Set<ApplicationListener<?>> applicationListeners;
public final Set<String> applicationListenerBeans;
private final boolean preFiltered;
public ListenerRetriever(boolean preFiltered) {
this.applicationListeners = new LinkedHashSet<>();
this.applicationListenerBeans = new LinkedHashSet<>();
this.preFiltered = preFiltered;
}
public Collection<ApplicationListener<?>> getApplicationListeners() {
List<ApplicationListener<?>> allListeners = new ArrayList<>(
this.applicationListeners.size() + this.applicationListenerBeans.size());
allListeners.addAll(this.applicationListeners);
if (!this.applicationListenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : this.applicationListenerBeans) {
try {
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (this.preFiltered || !allListeners.contains(listener)) {
allListeners.add(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
}
}
SimpleApplicationEventMulticaster源码分析,该类主要实现了事件的分发方法。
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
@Nullable
private Executor taskExecutor;
@Nullable
private ErrorHandler errorHandler;
/**
* Create a new SimpleApplicationEventMulticaster.
*/
public SimpleApplicationEventMulticaster() {
}
/**
* Create a new SimpleApplicationEventMulticaster for the given BeanFactory.
*/
public SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
setBeanFactory(beanFactory);
}
/**
* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
* to invoke each listener with.
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
* executing all listeners synchronously in the calling thread.
* <p>Consider specifying an asynchronous task executor here to not block the
* caller until all listeners have been executed. However, note that asynchronous
* execution will not participate in the caller's thread context (class loader,
* transaction association) unless the TaskExecutor explicitly supports this.
* @see org.springframework.core.task.SyncTaskExecutor
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
*/
public void setTaskExecutor(@Nullable Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Return the current task executor for this multicaster.
*/
@Nullable
protected Executor getTaskExecutor() {
return this.taskExecutor;
}
/**
* Set the {@link ErrorHandler} to invoke in case an exception is thrown
* from a listener.
* <p>Default is none, with a listener exception stopping the current
* multicast and getting propagated to the publisher of the current event.
* If a {@linkplain #setTaskExecutor task executor} is specified, each
* individual listener exception will get propagated to the executor but
* won't necessarily stop execution of other listeners.
* <p>Consider setting an {@link ErrorHandler} implementation that catches
* and logs exceptions (a la
* {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_SUPPRESS_ERROR_HANDLER})
* or an implementation that logs exceptions while nevertheless propagating them
* (e.g. {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_PROPAGATE_ERROR_HANDLER}).
* @since 4.1
*/
public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Return the current error handler for this multicaster.
* @since 4.1
*/
@Nullable
protected ErrorHandler getErrorHandler() {
return this.errorHandler;
}
@Override
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
return ResolvableType.forInstance(event);
}
/**
* Invoke the given listener with the given event.
* @param listener the ApplicationListener to invoke
* @param event the current event to propagate
* @since 4.1
*/
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
private boolean matchesClassCastMessage(String classCastMessage, String eventClassName) {
// On Java 8, the message simply starts with the class name: "java.lang.String cannot be cast..."
if (classCastMessage.startsWith(eventClassName)) {
return true;
}
// On Java 9, the message contains the module name: "java.base/java.lang.String cannot be cast..."
int moduleSeparatorIndex = classCastMessage.indexOf('/');
if (moduleSeparatorIndex != -1 && classCastMessage.startsWith(eventClassName, moduleSeparatorIndex + 1)) {
return true;
}
// Assuming an unrelated class cast failure...
return false;
}
}
网友评论