Spring事件监听
1.举例
(1)定义监听器监听的对象BaseFetchDataEvent
@Getter
@Setter
@ToString
public class BaseFetchDataEvent extends ApplicationEvent {
/** 测试数据对象 */
private ObjectDto objectDto;
/** 计数器 **/
private CountDownLatch countDownLatch;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public BaseFetchDataEvent(Object source) {
super(source);
}
}
(2)创建一个测试对象实体类
@Data
public class ObjectDto {
/** 主键ID*/
private Long id;
/** 任务ID*/
private Long taskId;
/** status*/
private Integer status;
}
(3)创建两个监听器,直接监听BaseFetchDataEvent事件
@Slf4j
@Order(1)
@EasyService
public class OneBaseDataEventListener implements ApplicationListener<BaseFetchDataEvent> {
@Override
public void onApplicationEvent(BaseFetchDataEvent event) {
if (event == null) return;
ObjectDto dto = event.getObjectDto();
CountDownLatch countDownLatch = event.getCountDownLatch();
try {
dto.setStatus(2);
} catch (Exception e) {
log.error("e:{}",e);
}finally {
countDownLatch.countDown();
}
}
}
@Slf4j
@Order(2)
@EasyService
public class TwoBaseDataEventListener implements ApplicationListener<BaseFetchDataEvent> {
@Override
public void onApplicationEvent(BaseFetchDataEvent event) {
if (event == null) return;
ObjectDto dto = event.getObjectDto();
CountDownLatch countDownLatch = event.getCountDownLatch();
try {
dto.setId(1l);
dto.setTaskId(1001l);
} catch (Exception e) {
log.error("e:{}",e);
}finally {
countDownLatch.countDown();
}
}
}
(4)定义测试类:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = Application.class)
@ActiveProfiles("dev")
@WebAppConfiguration
@Slf4j
public class ApplicationListenerTest {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Test
public void listenerTest(){
ObjectDto objectDto = new ObjectDto();
try {
CountDownLatch countDownLatch = new CountDownLatch(2);
BaseFetchDataEvent fetchDataEvent = new BaseFetchDataEvent("基础数据抓取事件");
fetchDataEvent.setCountDownLatch(countDownLatch);
fetchDataEvent.setObjectDto(objectDto);
applicationEventPublisher.publishEvent(fetchDataEvent);
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(objectDto.toString());
}
}
(5)执行结果
监听器监听到
BaseFetchDataEvent
事件,并调用onApplicationEvent
方法
2源码分析
ApplicationEventPublisher
接口(封装事件发布功能)提供了一个方法publishEvent
,将事件发送出去,通知应用所有已注册且匹配的监听器此ApplicationEvent;通知应用所有已注册且匹配的监听器此Event ,如果这个Event不是一个ApplicationEvent
,则其被包裹于PayloadApplicationEvent
[站外图片上传中...(image-f7841a-1551080194999)]
核心是:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
//事件发布委托给ApplicationEventMulticaster
来执行getApplicationEventMulticaster()
方法是获取所有的监听器。
然后ApplicationEventMulticaster
的multicastEvent
方法的实现在SimpleApplicationEventMulticaster
类中:
获取event所有的监听事件,然后遍历执行监听器的onApplicationEvent方法,可知此方法是核心方法,是真正调用监听器的地方;
从下面代码可以看到,找到已注册的ApplicationListener
,逐个调用invokeListener
方法,将ApplicationListener
和事件作为入参传进去就完成了广播;
[站外图片上传中...(image-368b29-1551080194999)]
最终调用invokelistener
,执行onApplicationEvent(event)
;invokeListener
方法:,ApplicationListener
是代表监听的接口,只要调用这个接口的方法并且将event
作为入参传进去,那么每个监听器就可以按需要自己来处理这条广播消息了,
[站外图片上传中...(image-9f76b5-1551080194999)]
如果多线程同时发广播,会不会有线程同步的问题?
唯一有可能出现问题的地方在:multicastEvent方法获取ApplicationListener
的时候可能出现同步问题,看代码:
[站外图片上传中...(image-40aed3-1551080194999)]
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
//缓存的key有两个维度:消息来源+消息类型(关于消息来源可见ApplicationEvent构造方法的入参)
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// retrieverCache是ConcurrentHashMap对象,所以是线程安全的,
// ListenerRetriever中有个监听器的集合,并有些简单的逻辑封装,
//调用它的getApplicationListeners方法返回的监听类集合是排好序的(order注解排序)
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
//如果retrieverCache中找到对应的监听器集合,就立即返回了
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
//如果retrieverCache中没有数据,就在此查出数据并放入缓存,
//先加锁
synchronized (this.retrievalMutex) {
//双重判断的第二重,避免自己在BLOCK的时候其他线程已经将数据放入缓存了
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
//新建一个ListenerRetriever对象
retriever = new ListenerRetriever(true);
//retrieveApplicationListeners方法复制找出某个消息类型加来源类型对应的所有监听器
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
//存入retrieverCache
this.retrieverCache.put(cacheKey, retriever);
//返回结果
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
在广播消息的时刻,如果某个类型的消息在缓存中找不到对应的监听器集合,就调用retrieveApplicationListeners方法去找出符合条件的所有监听器,然后放入这个集合
3 如何具备消息发送能力
spring容器初始化的时候会对实现了Aware接口的bean做相关的特殊处理,其中就包含ApplicationEventPublisherAware
这个与广播发送相关的接口
在spring容器初始化的时候,AbstractApplicationContext
类的prepareBeanFactory
方法中为所有bean准备了一个后置处理器ApplicationListenerDetector
,来看看它的postProcessAfterInitialization
方法的代码,也就是bean在实例化之后要做的事情:
[站外图片上传中...(image-c0dd8b-1551080194999)]
核心的一句:
this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
此代码注册监听器,其实就是保存在成员变量applicationEventMulticaster
的成员变量defaultRetriever
的集合applicationListeners
中
即:当前bean实现了ApplicationListener
接口,就会调用this.applicationContext.addApplicationListener
方法将当前bean注册到applicationContext
的监听器集合中,后面有广播就直接找到这些监听器,调用每个监听器的onApplicationEvent
方法;
自定义的消息监听器可以指定消息类型,所有的广播消息中,这个监听器只会收到自己指定的消息类型的广播,spring是如何做到这一点的?
4 如何做到只接收指定类型的
自定义监听器只接收指定类型的消息,以下两种方案都可以实现:
1.注册监听器的时候,将监听器和消息类型绑定; 2.广播的时候,按照这条消息的类型去找指定了该类型的监听器,但不可能每条广播都去所有监听器里面找一遍,应该是说广播的时候会触发一次监听器和消息的类型绑定;
spring如何处理?
先看注册监听器的代码
按照之前的分析,注册监听发生在后置处理器ApplicationListenerDetector
中,看看this.applicationContext.addApplicationListener
这一行代码的内部逻辑:
[站外图片上传中...(image-f3b4e2-1551080194999)]
继续往下debug:
image
把监听器加入集合defaultRetriever.applicationListeners
中,这是个LinkedHashSet
实例
this.defaultRetriever.applicationListeners.add(listener);
注册监听器,其实就是把ApplicationListener
的实现类放入一个LinkedHashSet
的集合,此处没有任何与消息类型相关的操作,因此,监听器注册的时候并没有将消息类型和监听器绑定
去看广播消息的代码
来到SimpleApplicationEventMulticaster
的multicastEvent
方法
可以看到方法
getApplicationListeners(event, type)
,包含了listener
的type
;即,在发送消息的时候根据类型去找所有对应的监听器;
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
//缓存的key有两个维度:消息来源+消息类型(关于消息来源可见ApplicationEvent构造方法的入参)
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// retrieverCache是ConcurrentHashMap对象,所以是线程安全的,
// ListenerRetriever中有个监听器的集合,并有些简单的逻辑封装,调用它的getApplicationListeners方法返回的监听类集合是排好序的(order注解排序)
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
//如果retrieverCache中找到对应的监听器集合,就立即返回了
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
//如果retrieverCache中没有数据,就在此查出数据并放入缓存,
//先加锁
synchronized (this.retrievalMutex) {
//双重判断的第二重,避免自己在BLOCK的时候其他线程已经将数据放入缓存了
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
//新建一个ListenerRetriever对象
retriever = new ListenerRetriever(true);
//retrieveApplicationListeners方法复制找出某个消息类型加来源类型对应的所有监听器
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
//存入retrieverCache
this.retrieverCache.put(cacheKey, retriever);
//返回结果
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
在广播消息的时刻,如果某个类型的消息在缓存中找不到对应的监听器集合,就调用retrieveApplicationListeners
方法去找出符合条件的所有监听器,然后放入这个集合。
网友评论