美文网首页
ApplicationEvent&ApplicationList

ApplicationEvent&ApplicationList

作者: 秋水畏寒 | 来源:发表于2021-02-08 17:17 被阅读0次

1、参考

Standard and Custom Events

2、前言

公司项目中有个发送站内信的功能,是使用ApplicationEventApplicationListener实现的,对于这一块,之前未接触过,在该项目中,其具体功能如下:

  • ApplicationContext发送ApplicationEvent类型的站内信
  • ApplicationListener接收到站内信信息,进行入库处理,createTime直接写死成了new Date()
  • 用户界面读取入库后的数据,根据createTime倒序呈现给用户

产品经理要求站内信需要按发送的顺序呈现给用户,那问题来了:

ApplicationListener接收到Event信息时,是否是有序的呢?

3、Demo

基于spring boot创建一个简易的ApplicationEvent使用demo,基于该demo执行并发测试,初步验证是否为有序接收

3.1、CustomEvent

自定义Event,继承ApplicationEvent,并实现构造方法

public class CustomEvent extends ApplicationEvent {
    /**
     * Create a new ApplicationEvent.
     *
     * @param source the object on which the event initially occurred (never {@code null})
     */
    public CustomEvent(Object source) {
        super(source);
    }
}

3.2、CustomPublisher

Event事件发布者,此处定义publish方法作为发送入口,提供给controller调用,方便测试

@Component
public class CustomPublisher {

    @Autowired
    private ApplicationContext applicationContext;

    public void publish(CustomEvent customEvent) {
        applicationContext.publishEvent(customEvent);
    }
}

除了使用注解的方式,spring也提供了实现ApplicationEventPublisherAware接口的方式,具体可戳参考链接

3.3、CustomListener

Event事件监听器,接收Event事件并对其进行处理,此处进行简单的打印

@Component
public class CustomListener {
    @EventListener
    public void listen(CustomEvent customEvent) {
        System.out.println(customEvent.getSource());
    }
}

除了使用注解的方式,spring也提供了ApplicationListener接口的方式,具体可戳参考链接

3.4、测试

@RestController
public class RestController {
    @Autowired
    private CustomPublisher customPublisher;
    int i = 0;

    @GetMapping(value = "/publish")
    public void publish() {
        CustomEvent customEvent = new CustomEvent(i++);
        customPublisher.publish(customEvent);
    }
}

使用postmanrunner功能,创建如下runner,进行并发测试:

runner
结果

从结果来看,事件接收是有序的,查阅官网,可以看到这么一句话:

You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously. This means that the publishEvent() method blocks until all listeners have finished processing the event.
大致意思是:event listeners默认上同步接收events,这意味着,publicEvent()方法将会阻塞直至所有的listeners处理完event为止。

4、源码

追溯源码,查看ApplicationEvent的发布和接收功能是如何实现的,首先,程序入口位于ApplicationContext#publishEvent

    default void publishEvent(ApplicationEvent event) {
        publishEvent((Object) event);
    }

方法内调用了重载的publicEvent,而该重载publishEvent是一个接口方法,有以下实现:

publishEvent实现

显然,具体的实现类是AbstractApplicationContext#publishEvent

    @Override
    public void publishEvent(Object event) {
        publishEvent(event, null);
    }

继续调用重载方法publishEvent(Object event, @Nullable ResolvableType eventType)

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
        Assert.notNull(event, "Event must not be null");
        if (logger.isTraceEnabled()) {
            logger.trace("Publishing event in " + getDisplayName() + ": " + event);
        }

        // Decorate event as an ApplicationEvent if necessary
        ApplicationEvent applicationEvent;
        if (event instanceof ApplicationEvent) {
            applicationEvent = (ApplicationEvent) event;
        }
        else {
            applicationEvent = new PayloadApplicationEvent<>(this, event);
            if (eventType == null) {
                eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
            }
        }

        // Multicast right now if possible - or lazily once the multicaster is initialized
        if (this.earlyApplicationEvents != null) {
            this.earlyApplicationEvents.add(applicationEvent);
        }
        else {
            getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
        }

        // Publish event via parent context as well...
        if (this.parent != null) {
            if (this.parent instanceof AbstractApplicationContext) {
                ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
            }
            else {
                this.parent.publishEvent(event);
            }
        }
    }

逻辑可以总结为以下三点:

  • 将入参event包装成applicationEvent
  • 交由多播器进行消息发布(multicastEvent
  • 若父级上下文不为空,则父级上下文同样需要进行消息发布

显然,核心方法在于multicastEvent

    @Override
    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            Executor executor = getTaskExecutor();
            if (executor != null) {
                executor.execute(() -> invokeListener(listener, event));
            }
            else {
                invokeListener(listener, event);
            }
        }
    }

这里使用了线程池,调用了invokeListener

    protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
        ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {
                doInvokeListener(listener, event);
            }
            catch (Throwable err) {
                errorHandler.handleError(err);
            }
        }
        else {
            doInvokeListener(listener, event);
        }
    }

继续调用doInvokeListener

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {
            listener.onApplicationEvent(event);
        }
        catch (ClassCastException ex) {
            // ……
        }
    }

onApplicationEvent,是ApplicationListener接口类中的方法,在3.3节中,有提及自定义监听器可以继承ApplicationListener接口并实现onApplicationEvent方法进行监听,若是使用该种方法,则事件的发送和接收处理的源码就已经追溯完毕,不过本文中使用的是注解,故需要继续跟踪下去,onApplicationEvent的实现类颇多:

onApplicationEvent实现类
通过debug验证,ApplicationListenerMethodAdapter才是注解类listener的具体实现:
    public void onApplicationEvent(ApplicationEvent event) {
        processEvent(event);
    }

查看processEvent

    public void processEvent(ApplicationEvent event) {
        Object[] args = resolveArguments(event);
        if (shouldHandle(event, args)) {
            Object result = doInvoke(args);
            if (result != null) {
                handleResult(result);
            }
            else {
                logger.trace("No result object given - no result to handle");
            }
        }
    }

可以看到核心在于doInvoke

    protected Object doInvoke(Object... args) {
        Object bean = getTargetBean();
        ReflectionUtils.makeAccessible(this.method);
        try {
            return this.method.invoke(bean, args);
        }
                // ……
      }

这里使用了反射,调用method方法,那么,method是什么时候赋值,具体值又是什么呢,对method进行Find Usages,可以看到:

    public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
        this.beanName = beanName;
        this.method = BridgeMethodResolver.findBridgedMethod(method);
        // ……
    }

method是在ApplicationListenerMethodAdapter构造方法中赋值的,debug,可以看到具体值为:

public void com.kungyu.rabbitmq.CustomListener.listen(com.kungyu.rabbitmq.CustomEvent)

也就是我们自定义的监听方法,那么,ApplicationListenerMethodAdapter构造方法何时被调用,继续Find Usages

public class DefaultEventListenerFactory implements EventListenerFactory, Ordered {

    // ……
    @Override
    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        return new ApplicationListenerMethodAdapter(beanName, type, method);
    }

}

createApplicationListener进行Find Usages

protected void processBean(
            final List<EventListenerFactory> factories, final String beanName, final Class<?> targetType) {
    // ……
    ApplicationListener<?> applicationListener =
                                    factory.createApplicationListener(beanName, targetType, methodToUse);
    // ……
}

processBean进行Find Usages,可以看到是在afterSingletonsInstantiated进行调用的,而afterSingletonsInstantiated是类实例化相关的内容,此处不予展开

5、总结

从测试结果来看,ApplicationListener是有序接收消息的,从源码来看,其实现方式也不算过于复杂,是对知识点的一个很好的补充。另外,在中间件横行的时代,业务角度上貌似这一功能没怎么被使用,出于解耦的目的,应该都会用MQ实现该功能了。

相关文章

网友评论

      本文标题:ApplicationEvent&ApplicationList

      本文链接:https://www.haomeiwen.com/subject/oxoxxltx.html