1. 接入流程
发布事件
@RestController
public class EventController {
@Autowired
private ApplicationContext applicationContext;
@GetMapping("sendEvent")
public void sendEvent(){
EmailEvent emailEvent=new EmailEvent(applicationContext,"发送邮件");
applicationContext.publishEvent(emailEvent);
}
}
事件对象:
public class EmailEvent extends ApplicationContextEvent {
private String message;
/**
* Create a new ContextStartedEvent.
*
* @param source the {@code ApplicationContext} that the event is raised for
* (must not be {@code null})
*/
public EmailEvent(ApplicationContext source) {
super(source);
}
public EmailEvent(ApplicationContext source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
接收类:
@Slf4j
@Service
public class EventService {
@org.springframework.context.event.EventListener
public void accept(EmailEvent emailEvent) {
log.info("接收事件:{}", emailEvent.getMessage());
}
}
执行结果:
2021-01-06 14:26:35,535 INFO [59754] [http-nio-8089-exec-1] [] (EventController.java:26): 发布事件
2021-01-06 14:26:35,535 INFO [59754] [http-nio-8089-exec-1] [] (EventService.java:19): 接收事件:发送邮件
可以看到,上面事件触发监听和监听方法处于同一个线程中。
2. 源码分析
源码位置:org.springframework.context.support.AbstractApplicationContext#refresh
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext {
//加载事件广播
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//判断对象是否在Spring容器中存在
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
//存在的话直接获取
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isDebugEnabled()) {
logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
//创建一个事件广播器
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
//放入到Spring容器中
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
"': using default [" + this.applicationEventMulticaster + "]");
}
}
}
}
由上面可知,我们可以自己声明ApplicationEventMulticaster
子类。
那么,默认的SimpleApplicationEventMulticaster
的处理逻辑是什么呢?
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
//线程池(实现异步监听的关键)
@Nullable
private Executor taskExecutor;
//异常处理器
@Nullable
private ErrorHandler errorHandler;
...
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
//线程池不为空,线程池处理
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
//同步执行
invokeListener(listener, event);
}
}
}
private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
return ResolvableType.forInstance(event);
}
/**
* Invoke the given listener with the given event.
* @param listener the ApplicationListener to invoke
* @param event the current event to propagate
* @since 4.1
*/
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
//配置了回调方法
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
//出现异常时,回调方法
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
//捕捉类转换异常
catch (ClassCastException ex) {
String msg = ex.getMessage();
//如果meg为null || 匹配类转换消息,以保证抛出类转换异常是因eventClass引起的
if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
// 可能是 lambda定义的监听器,我们无法解析的通用事件类型 -> 让我们抑制异常,只记录一条调试信息
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
private boolean matchesClassCastMessage(String classCastMessage, String eventClassName) {
// On Java 8, the message starts with the class name: "java.lang.String cannot be cast..."
// 在 JAVA 8中,消息以类名开始:'java.lang.String 不能被转换 .. '
//如果 classCastMessage 是以 eventClass类名开头,返回true
if (classCastMessage.startsWith(eventClassName)) {
return true;
}
// On Java 9, the message used to contain the module name: "java.base/java.lang.String cannot be cast..."
// 在 Java 9, 用于包含模块名的消息:'java.base/java.lang.String 不能被转换'
// 找出 classCastMessage 的 '/' 第一个索引位置
int moduleSeparatorIndex = classCastMessage.indexOf('/');
if (moduleSeparatorIndex != -1 && classCastMessage.startsWith(eventClassName, moduleSeparatorIndex + 1)) {
return true;
}
// Assuming an unrelated class cast failure...
// 假设一个不相关的类转换失败
//返回false
return false;
}
}
3. 改进方案—实现异步广播监听
自己定义ApplicationEventMulticaster
类,填充线程池参数,便可以实现异步监听事件。
@Configuration
public class EventConfig {
@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public SimpleApplicationEventMulticaster myEventMulticaster(){
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(20);
// 设置队列容量
executor.setQueueCapacity(100);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
网友评论