美文网首页Spring 功能使用及源码解析
Spring 框架事件收发功能的使用 (一) 基本使用

Spring 框架事件收发功能的使用 (一) 基本使用

作者: 云逸Dean | 来源:发表于2019-07-11 16:57 被阅读0次

Spring 框架事件收发功能的使用 (一) 基本使用

1. 概述

Spring 框架时间收发功能本系列的文章总共分为三部分。

  • 第一部分:为大家介绍如何用 Spring 来进行事件收发,如何让 Spring 的事件收发变为异步进行的,Spring 又是如何实现异步收发的。
  • 第二部分:为大家介绍如何在事务的各个节点去触发监听器的执行,以及基于事务的事件监听器在异步配置下失效的原因。
  • 第三部分:为大家找到如何让基于事务的事件监听器能在异步配置下正确使用和运行。

下列内容翻译自 Spring framework document 1.15.2
Spring 在 ApplicationContext 中的事件处理,通过 ApplicationEvent class 和 ApplicationListener interface 所提供。如果一个 bean 实现了 ApplicationListener 并且被放发布到了 context 中,每当一个 ApplicationEvent 被发布到 ApplicationContext 中,那这个 bean 便会被通知。实际上,这个流程的实现基于观察者模式
Spring 中还有内建的 7 种 Event,这里不做赘述,有兴趣的同学可以通过上方连接进入 Spring 官方文档进行查看。

2. 事件收发的实现

2.1 概述

第一小节的内容主要是介绍 Spring 框架中事件收发功能的两种实现方式,一种是基于继承类和实现接口的方式,另一种是基于注解的方式。推荐还是使用注解的方式,因为这种方式能将同一业务特征的监听和处理放入一个服务类中,便于统一的管理和查阅。

2.2 自定义使用

2.2.1 基于继承的方式

我们首先需要创建一个继承于 ApplicationEvent 的事件数据类:

@Data
public class BlackListEvent extends ApplicationEvent {

    private final String address;
    private final String content;

    // accessor and other methods...
}

为了能将我们新创建的事件类实例发布出去,我们需要有一个实现了 ApplicationEventPublisherAware 的服务类。同时,我们需要将 ApplicationEventPublisher 注入到服务类。然后,我们可以通过publisher实例的 publish 方法将我们的 event 实例发布出去。

@Service
public class EmailService implements ApplicationEventPublisherAware {

    private List<String> blackList;
    private ApplicationEventPublisher publisher;

    public void setBlackList(List<String> blackList) {
        this.blackList = blackList;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void sendEmail(String address, String content) {
        if (blackList.contains(address)) {
            publisher.publishEvent(new BlackListEvent(this, address, content));
            return;
        }
        // send email...
    }
}

此时,我们成功的将事件发送出去了。那如果我们对这个事件感兴趣的话,如何去接受他呢?接下来我们就来看看怎么能接收到我们刚刚发出的事件。下列的代码实现了我们对 BlackListEvent 的监听器,当我们将这个监听器实例化以后,我们就可以正确的监听到发送出来的BlackListEvent 事件并处理他了。

@Componet
public class BlackListNotifier implements ApplicationListener<BlackListEvent> {

    private String notificationAddress;

    public void setNotificationAddress(String notificationAddress) {
        this.notificationAddress = notificationAddress;
    }

    public void onApplicationEvent(BlackListEvent event) {
        // notify appropriate parties via notificationAddress...
    }

这是最基本的实现方式,但是我相信大部分朋友都已经习惯了使用 Spring 4.0 引入的注解模式来使用 Spring 了。所以我们接下来就会讲,如果用更加 fashion 的方式来实现我们的事件收发。

2.2.2 基于注解的方式

首先,大家要确认一下自己的 Spring 版本是否是在 4.2+ 。如果低于这个版本的话,这段内容对于你来说是没有任何用处的哟。
基于注解我们能够更加简单明了的来实现监听器。并且我们有三种不同的实现方式,来针对于我们不同的监听场景。

  • 1.接收到需要监听的 event ,并且处理他
public class BlackListNotifier {

    private String notificationAddress;

    public void setNotificationAddress(String notificationAddress) {
        this.notificationAddress = notificationAddress;
    }

    @EventListener
    public void processBlackListEvent(BlackListEvent event) {
        // notify appropriate parties via notificationAddress...
    }
}
  • 2.监听一个或多个事件,但是不需要 event 的数据,只是对应 event 发布以后,需要做一些处理
@EventListener({ContextStartedEvent.class, ContextRefreshedEvent.class})
public void handleContextStart() {
    ...
}
  • 3.使用SpEL表达式来监听事件(其实我也不太会 SpEL 表达式,后面会抽时间看看文档,再写个博客来学习一下)
@EventListener(condition = "#blEvent.content == 'my-event'")
public void processBlackListEvent(BlackListEvent blEvent) {
    // notify appropriate parties via notificationAddress...
}

3. 异步事件收发的实现

3.1 概述

上一节的内容介绍了如何使用 Spring 框架自带的事件收发功能。通常,我们使用事件收发方式来处理的目的,都是希望能将事件触发和事件监听这两部分的业务进行解耦以及能够无阻塞的执行监听后的任务。但是,如果我们不去特别配置的话,那整个事件的收发其实是同步进行的。本节将会分为两个部分来介绍异步的进行事件的收发,第一部分是介绍我们如何配置能让 spring 的事件收发功能在异步下进行。第二部分我们将会对 Spring 框架事件处理源码进行普析,看看 Spring 到底是如何实现事件收发的,以及异步是如何实现的。

3.2 实现异步的事件收发

3.2.1 使用注解实现异步(官方文档推荐)

我们可以通过在监听器上添加 @Async 注解的方式,来实现某些监听器的异步化。但是,使用这种方式的时候一定要记住,在 Spring 的配置类中需要添加开启异步化的注解 @EnableAsync 。这种方式的好处是,我们可以让那些需要某些监听器使用异步的方式进行处理,其他的监听器仍然用同步的方式进行任务执行。

@EventListener
@Async
public void processBlackListEvent(BlackListEvent event) {
    // BlackListEvent is processed in a separate thread
}

这种方式比较灵活,但是如果本身项目内部的事件监听器比较多,而且都需要异步的话,想想那满屏幕的 @Sync,看起来肯定是非常不整洁和优雅的。那么,我们还有什么办法可以让项目里面所有的 EventListener 自然而然就是异步的呢?请看下一小节。

3.3 Spring 框架是如何实现异步收发的

为了能让 EventListener 注释的监听器方法都是异步的,我们需要进入 Spring 框架的源码,看看到底 Spring 是怎么实现这个事件监听的。
首先,我们进入 @EventListener 这个类,可以看到在类名上面一大段文字说明,其中包含下面这一段解释:

 * <p>Processing of {@code @EventListener} annotations is performed via
 * the internal {@link EventListenerMethodProcessor} bean which gets
 * registered automatically when using Java config or manually via the
 * {@code <context:annotation-config/>} or {@code <context:component-scan/>}
 * element when using XML config.

大概,含义就是说,我们这个注解的功能是通过内部的 EventListenerMethodProcessor 这个类来实现功能的,这个类会在你使用 Java Config 的模式或者是使用 XML 的方式自动扫描后,会被注册。那自然而然我们进入这个类,找一找他是如何实现 @EventListener 注解的功能的。
接下来我会在源码中进行注释的方式进行分解,这样更容易将我对源码的理解使用文本的方式输出出来。

public class EventListenerMethodProcessor
        implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {

    protected final Log logger = LogFactory.getLog(getClass());

    @Nullable
    private ConfigurableApplicationContext applicationContext;

    @Nullable
    private ConfigurableListableBeanFactory beanFactory;

    @Nullable
    private List<EventListenerFactory> eventListenerFactories;

    private final EventExpressionEvaluator evaluator = new EventExpressionEvaluator();

    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

    /**
    * 用于注入 ApplicationContext 
    **/
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
                "ApplicationContext does not implement ConfigurableApplicationContext");
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }
    /**
    * 实现BeanFactoryPostProcessor接口以后,可以在 spring 初始化 bean 时通过这个方法进行自定义的 bean 初始化。
    * 但是,会导致 bean 过早的初始化,可能会导致一些意想不到的副作用。
    **/
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
        this.beanFactory = beanFactory;
        // 获取传入 class 类型的 bean 列表
        Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
        List<EventListenerFactory> factories = new ArrayList<>(beans.values());
        // 将所有的 EventListnerFactory 实例 bean 按照 注解顺序进行排序(这是为了让 @Order 注解生效)
        AnnotationAwareOrderComparator.sort(factories);
        this.eventListenerFactories = factories;
    }

    /**
    * 实现SmartInitializingSingleton接口以后,可以在通常的 singleton bean 初始化完成后被回调
    * 
    **/
    @Override
    public void afterSingletonsInstantiated() {
        ConfigurableListableBeanFactory beanFactory = this.beanFactory;
        // 判断 beanFactory 在之前是否正确注入
        Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
        String[] beanNames = beanFactory.getBeanNamesForType(Object.class);//获取所有 bean 的名字
        for (String beanName : beanNames) {
            if (!ScopedProxyUtils.isScopedTarget(beanName)//判断这个 bean 是否被代理) {
                Class<?> type = null;
                try {
                    type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);//获取对应 beanName 的 bean class 类型
                }
                catch (Throwable ex) {
                    // An unresolvable bean type, probably from a lazy bean - let's ignore it.
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
                    }
                }
                if (type != null) {
                    if (ScopedObject.class.isAssignableFrom(type)) {//判断此 bean 的类型是否是通过代理的
                        try {
                            Class<?> targetClass = AutoProxyUtils.determineTargetClass(
                                    beanFactory, ScopedProxyUtils.getTargetBeanName(beanName)//获取被代理后的 bean 的 beanName
                                    );//获取被代理的 bean 的类型
                            if (targetClass != null) {
                                type = targetClass;
                            }
                        }
                        catch (Throwable ex) {
                            // An invalid scoped proxy arrangement - let's ignore it.
                            if (logger.isDebugEnabled()) {
                                logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
                            }
                        }
                    }
                    try {
                        processBean(beanName, type);//注册所有@EventListner @TransactionalEventListener
                    }
                    catch (Throwable ex) {
                        throw new BeanInitializationException("Failed to process @EventListener " +
                                "annotation on bean with name '" + beanName + "'", ex);
                    }
                }
            }
        }
    }

    private void processBean(final String beanName, final Class<?> targetType) {
        if (!this.nonAnnotatedClasses.contains(targetType)//排除掉 bean 类型中没有@EventListener 的注解
                 && !isSpringContainerClass(targetType)//不是 Spring 框架内部的类
                 ) {
            Map<Method, EventListener> annotatedMethods = null;
            try {
                //获取对应bean 类型中所有被 @EventListener 注解的方法
                annotatedMethods = MethodIntrospector.selectMethods(targetType,
                        (MethodIntrospector.MetadataLookup<EventListener>) method ->
                                AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
            }
            catch (Throwable ex) {
                // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
                if (logger.isDebugEnabled()) {
                    logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
                }
            }
            // 如果 bean 类型中没有 @EventListener 注解,则把他记录下来,避免下次再去处理,可以优化性能
            if (CollectionUtils.isEmpty(annotatedMethods)) {
                this.nonAnnotatedClasses.add(targetType);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
                }
            }
            else { 
                ConfigurableApplicationContext context = this.applicationContext;
                Assert.state(context != null, "No ApplicationContext set");
                List<EventListenerFactory> factories = this.eventListenerFactories;
                Assert.state(factories != null, "EventListenerFactory List not initialized");

                for (Method method : annotatedMethods.keySet()) {
                    for (EventListenerFactory factory : factories) {
                        if (factory.supportsMethod(method)) {
                            Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
                            ApplicationListener<?> applicationListener =
                                    factory.createApplicationListener(beanName, targetType, methodToUse);
                            if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                                ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
                            }
                            context.addApplicationListener(applicationListener);
                            break;
                        }
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
                            beanName + "': " + annotatedMethods);
                }
            }
        }
    }

    /**
     * Determine whether the given class is an {@code org.springframework}
     * bean class that is not annotated as a user or test {@link Component}...
     * which indicates that there is no {@link EventListener} to be found there.
     * @since 5.1
     */
    private static boolean isSpringContainerClass(Class<?> clazz) {
        return (clazz.getName().startsWith("org.springframework.") &&
                !AnnotatedElementUtils.isAnnotated(ClassUtils.getUserClass(clazz), Component.class));
    }

}

上面,一大堆的代码就是 Spring 注册监听器的逻辑,那我们接下来看看 publisher 是怎么把 event 发送给他们的。


public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {

    @Nullable
    private Executor taskExecutor;

    @Nullable
    private ErrorHandler errorHandler;


    /**
     * Create a new SimpleApplicationEventMulticaster.
     */
    public SimpleApplicationEventMulticaster() {
    }

    /**
     * Create a new SimpleApplicationEventMulticaster for the given BeanFactory.
     */
    public SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
        setBeanFactory(beanFactory);
    }


    /**
     * 这里可以将监听器异步化的线程池绑定到这个 multicaster 中,通过@Nullable注解可以看出
     * 如果,我们没有现式的去绑定线程池,那么这个 multicaster 应该是使用同步的方式进行广播。
    **/
    public void setTaskExecutor(@Nullable Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /**
     * Return the current task executor for this multicaster.
     */
    @Nullable
    protected Executor getTaskExecutor() {
        return this.taskExecutor;
    }

    /**
     * 这里可以看到,如果我们在事件发送失败以后,需要一些处理,那我们也可以自定义一个
     * ErrorHandler 给绑定到这个 广播器 上
     */
    public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Nullable
    protected ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    
    @Override
    public void multicastEvent(ApplicationEvent event) {
        multicastEvent(event, resolveDefaultEventType(event));
    }

    /**
    * AbstractApplicationEventMulticaster  
    **/
    @Override
    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {//将所有对应的
            Executor executor = getTaskExecutor();
            if (executor != null) {//这里便是如何实现异步的地方,如果我们给他绑定了线程池,那么他会使用 lambda 的方式调用 listener。
                executor.execute(() -> invokeListener(listener, event));
            }
            else {
                invokeListener(listener, event);
            }
        }
    }

}

所以,从上面的源码可以得出。如果我们给这个 multicaster 配置上他需要的线程池,那么他便会以异步的方式来调用 listener。

4. 小结

我们在这篇文章中,告诉了大家如何使用类实现的方式和注解的方式来使用 Spring 框架自带的事件收发功能。如何将 Spring 的事件收发变为异步的。Spring 又是如何实现异步收发事件的。接下来的一篇文章中,介绍如何将事件的监听器的触发与事件发布所处事务进行联动,以及在异步场景下的陷阱。

相关文章

网友评论

    本文标题:Spring 框架事件收发功能的使用 (一) 基本使用

    本文链接:https://www.haomeiwen.com/subject/jaidkctx.html