美文网首页
Spring事件监听--源码总结

Spring事件监听--源码总结

作者: 未名枯草 | 来源:发表于2019-02-25 15:37 被阅读0次

    Spring事件监听

    1.举例

    (1)定义监听器监听的对象BaseFetchDataEvent

    @Getter
    @Setter
    @ToString
    public class BaseFetchDataEvent extends ApplicationEvent {
    
        /** 测试数据对象 */
        private ObjectDto objectDto;
    
        /** 计数器 **/
        private CountDownLatch countDownLatch;
    
        /**
         * Create a new ApplicationEvent.
         *
         * @param source the object on which the event initially occurred (never {@code null})
         */
        public BaseFetchDataEvent(Object source) {
            super(source);
        }
    }
    

    (2)创建一个测试对象实体类

    @Data
    public class ObjectDto {
    
        /** 主键ID*/
        private Long id;
    
        /** 任务ID*/
        private Long taskId;
    
        /** status*/
        private Integer status;
    
    }
    

    (3)创建两个监听器,直接监听BaseFetchDataEvent事件

    @Slf4j
    @Order(1)
    @EasyService
    public class OneBaseDataEventListener implements ApplicationListener<BaseFetchDataEvent> {
    
        @Override
        public void onApplicationEvent(BaseFetchDataEvent event) {
            if (event == null) return;
    
            ObjectDto dto = event.getObjectDto();
            CountDownLatch countDownLatch = event.getCountDownLatch();
            try {
                dto.setStatus(2);
            } catch (Exception e) {
                log.error("e:{}",e);
            }finally {
                countDownLatch.countDown();
            }
        }
    }
    
    
    @Slf4j
    @Order(2)
    @EasyService
    public class TwoBaseDataEventListener implements ApplicationListener<BaseFetchDataEvent> {
    
        @Override
        public void onApplicationEvent(BaseFetchDataEvent event) {
            if (event == null) return;
    
            ObjectDto dto = event.getObjectDto();
            CountDownLatch countDownLatch = event.getCountDownLatch();
            try {
                dto.setId(1l);
                dto.setTaskId(1001l);
            } catch (Exception e) {
                log.error("e:{}",e);
            }finally {
                countDownLatch.countDown();
            }
        }
    }
    
    

    (4)定义测试类:

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = Application.class)
    @ActiveProfiles("dev")
    @WebAppConfiguration
    @Slf4j
    public class ApplicationListenerTest {
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
        @Test
        public void listenerTest(){
            ObjectDto objectDto = new ObjectDto();
            try {
                CountDownLatch countDownLatch = new CountDownLatch(2);
                BaseFetchDataEvent fetchDataEvent = new BaseFetchDataEvent("基础数据抓取事件");
                fetchDataEvent.setCountDownLatch(countDownLatch);
                fetchDataEvent.setObjectDto(objectDto);
                applicationEventPublisher.publishEvent(fetchDataEvent);
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(objectDto.toString());
        }
    }
    

    (5)执行结果

    image
    监听器监听到BaseFetchDataEvent事件,并调用onApplicationEvent方法

    2源码分析

    ApplicationEventPublisher接口(封装事件发布功能)提供了一个方法publishEvent,将事件发送出去,通知应用所有已注册且匹配的监听器此ApplicationEvent;通知应用所有已注册且匹配的监听器此Event ,如果这个Event不是一个ApplicationEvent,则其被包裹于PayloadApplicationEvent

    [站外图片上传中...(image-f7841a-1551080194999)]

    核心是:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); //事件发布委托给ApplicationEventMulticaster来执行getApplicationEventMulticaster()方法是获取所有的监听器。

    image

    然后ApplicationEventMulticastermulticastEvent方法的实现在SimpleApplicationEventMulticaster类中:
    获取event所有的监听事件,然后遍历执行监听器的onApplicationEvent方法,可知此方法是核心方法,是真正调用监听器的地方;

    从下面代码可以看到,找到已注册的ApplicationListener,逐个调用invokeListener方法,将ApplicationListener和事件作为入参传进去就完成了广播;

    [站外图片上传中...(image-368b29-1551080194999)]
    最终调用invokelistener,执行onApplicationEvent(event)invokeListener方法:,ApplicationListener是代表监听的接口,只要调用这个接口的方法并且将event作为入参传进去,那么每个监听器就可以按需要自己来处理这条广播消息了,
    [站外图片上传中...(image-9f76b5-1551080194999)]

    如果多线程同时发广播,会不会有线程同步的问题?
    唯一有可能出现问题的地方在:multicastEvent方法获取ApplicationListener的时候可能出现同步问题,看代码:

    [站外图片上传中...(image-40aed3-1551080194999)]

    protected Collection<ApplicationListener<?>> getApplicationListeners(
                ApplicationEvent event, ResolvableType eventType) {
    
        Object source = event.getSource();
    
        Class<?> sourceType = (source != null ? source.getClass() : null);
        //缓存的key有两个维度:消息来源+消息类型(关于消息来源可见ApplicationEvent构造方法的入参)
        ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
    
        // retrieverCache是ConcurrentHashMap对象,所以是线程安全的,
        // ListenerRetriever中有个监听器的集合,并有些简单的逻辑封装,
        //调用它的getApplicationListeners方法返回的监听类集合是排好序的(order注解排序)
        ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
        if (retriever != null) {
            //如果retrieverCache中找到对应的监听器集合,就立即返回了
            return retriever.getApplicationListeners();
        }
    
        if (this.beanClassLoader == null ||
           (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
                        (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
            //如果retrieverCache中没有数据,就在此查出数据并放入缓存,
            //先加锁
            synchronized (this.retrievalMutex) {
                //双重判断的第二重,避免自己在BLOCK的时候其他线程已经将数据放入缓存了
                retriever = this.retrieverCache.get(cacheKey);
                if (retriever != null) {
                    return retriever.getApplicationListeners();
                }
                //新建一个ListenerRetriever对象
                retriever = new ListenerRetriever(true);
                //retrieveApplicationListeners方法复制找出某个消息类型加来源类型对应的所有监听器
                Collection<ApplicationListener<?>> listeners =
                        retrieveApplicationListeners(eventType, sourceType, retriever);
                //存入retrieverCache  
                this.retrieverCache.put(cacheKey, retriever);
                //返回结果
                return listeners;
            }
        }
        else {
            // No ListenerRetriever caching -> no synchronization necessary
            return retrieveApplicationListeners(eventType, sourceType, null);
        }
    }
    

    在广播消息的时刻,如果某个类型的消息在缓存中找不到对应的监听器集合,就调用retrieveApplicationListeners方法去找出符合条件的所有监听器,然后放入这个集合

    3 如何具备消息发送能力

    spring容器初始化的时候会对实现了Aware接口的bean做相关的特殊处理,其中就包含ApplicationEventPublisherAware这个与广播发送相关的接口

    image

    在spring容器初始化的时候,AbstractApplicationContext类的prepareBeanFactory方法中为所有bean准备了一个后置处理器ApplicationListenerDetector,来看看它的postProcessAfterInitialization方法的代码,也就是bean在实例化之后要做的事情:
    [站外图片上传中...(image-c0dd8b-1551080194999)]
    核心的一句:
    this.applicationContext.addApplicationListener((ApplicationListener<?>) bean); 此代码注册监听器,其实就是保存在成员变量applicationEventMulticaster的成员变量defaultRetriever的集合applicationListeners
    即:当前bean实现了ApplicationListener接口,就会调用this.applicationContext.addApplicationListener方法将当前bean注册到applicationContext的监听器集合中,后面有广播就直接找到这些监听器,调用每个监听器的onApplicationEvent方法;

    自定义的消息监听器可以指定消息类型,所有的广播消息中,这个监听器只会收到自己指定的消息类型的广播,spring是如何做到这一点的?

    4 如何做到只接收指定类型的

    自定义监听器只接收指定类型的消息,以下两种方案都可以实现:
    1.注册监听器的时候,将监听器和消息类型绑定; 2.广播的时候,按照这条消息的类型去找指定了该类型的监听器,但不可能每条广播都去所有监听器里面找一遍,应该是说广播的时候会触发一次监听器和消息的类型绑定;

    spring如何处理?

    先看注册监听器的代码

    按照之前的分析,注册监听发生在后置处理器ApplicationListenerDetector中,看看this.applicationContext.addApplicationListener这一行代码的内部逻辑:

    [站外图片上传中...(image-f3b4e2-1551080194999)]
    继续往下debug:


    image

    把监听器加入集合defaultRetriever.applicationListeners中,这是个LinkedHashSet实例
    this.defaultRetriever.applicationListeners.add(listener);
    注册监听器,其实就是把ApplicationListener的实现类放入一个LinkedHashSet的集合,此处没有任何与消息类型相关的操作,因此,监听器注册的时候并没有将消息类型和监听器绑定

    去看广播消息的代码
    来到SimpleApplicationEventMulticastermulticastEvent方法

    image
    可以看到方法getApplicationListeners(event, type),包含了listenertype
    即,在发送消息的时候根据类型去找所有对应的监听器;
    protected Collection<ApplicationListener<?>> getApplicationListeners(
                ApplicationEvent event, ResolvableType eventType) {
    
        Object source = event.getSource();
    
        Class<?> sourceType = (source != null ? source.getClass() : null);
        //缓存的key有两个维度:消息来源+消息类型(关于消息来源可见ApplicationEvent构造方法的入参)
        ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
    
        // retrieverCache是ConcurrentHashMap对象,所以是线程安全的,
        // ListenerRetriever中有个监听器的集合,并有些简单的逻辑封装,调用它的getApplicationListeners方法返回的监听类集合是排好序的(order注解排序)
        ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
        if (retriever != null) {
            //如果retrieverCache中找到对应的监听器集合,就立即返回了
            return retriever.getApplicationListeners();
        }
    
        if (this.beanClassLoader == null ||
                (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
                        (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
            //如果retrieverCache中没有数据,就在此查出数据并放入缓存,
            //先加锁
            synchronized (this.retrievalMutex) {
                //双重判断的第二重,避免自己在BLOCK的时候其他线程已经将数据放入缓存了
                retriever = this.retrieverCache.get(cacheKey);
                if (retriever != null) {
                    return retriever.getApplicationListeners();
                }
                //新建一个ListenerRetriever对象
                retriever = new ListenerRetriever(true);
                //retrieveApplicationListeners方法复制找出某个消息类型加来源类型对应的所有监听器
                Collection<ApplicationListener<?>> listeners =
                        retrieveApplicationListeners(eventType, sourceType, retriever);
                //存入retrieverCache  
                this.retrieverCache.put(cacheKey, retriever);
                //返回结果
                return listeners;
            }
        }
        else {
            // No ListenerRetriever caching -> no synchronization necessary
            return retrieveApplicationListeners(eventType, sourceType, null);
        }
    }
    

    在广播消息的时刻,如果某个类型的消息在缓存中找不到对应的监听器集合,就调用retrieveApplicationListeners方法去找出符合条件的所有监听器,然后放入这个集合。

    相关文章

      网友评论

          本文标题:Spring事件监听--源码总结

          本文链接:https://www.haomeiwen.com/subject/czdgyqtx.html