在spring中如何开启
通过 @EnableAsync 来开启异步方法的能力。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// ...`
}
@EnableAsync 注解 Import 了 AsyncConfigurationSelector,这个在 SpringBoot 中是非常常见的一种写法,这里需要关注的是选择了哪个自动配置类;adviceMode 默认是 false,这里就以 ProxyAsyncConfiguration 为例:
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
AsyncAnnotationBeanPostProcessor
在 org.springframework.scheduling.annotation.ProxyAsyncConfiguration中最主要的就是创建 AsyncAnnotationBeanPostProcessor,从名字看,AsyncAnnotationBeanPostProcessor 就是来处理 @Async 注解的;目的很明确,就是创建对应 bean 的代理对象,以便于执行方法时能够进行 AOP 拦截(具体细节可以看 org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization这个方法)。
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
AnnotationAsyncExecutionInterceptor
AOP 中最外层的是代理类,然后是织入器(advisor),再接着是切面(advice he PointCut);前面已经将创建代理对象的逻辑进行了介绍,所以接下来是织入器(advisor)和切面的创建。实际上织入器(advisor)的创建逻辑也是在 AsyncAnnotationBeanPostProcessor 中完成的。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 创建 advisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
在 AsyncAnnotationAdvisor 的构造函数中,会构建 Advice 和 Pointcut
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 省略其他代码
/// ...
// 创建 advice
this.advice = buildAdvice(executor, exceptionHandler);
// 创建 pointcut
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
Advice 就是具体执行拦截的逻辑,这里的 advice 实际上 AnnotationAsyncExecutionInterceptor(why ? 因饰Advice 是 MethodInterceptor 的父类)。
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 这里
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
Tips
除了 adviceMode,一般情况下还会涉及到另外一个参数,即 proxyTargetClass;proxyTargetClass 在设置为 true 和 false 时,对应使用的代理机制大致如下:
- true
- 目标对象实现了接口 – 使用 CGLIB 代理机制
- 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制
- false
- 目标对象实现了接口 – 使用 JDK 动态代理机制(代理所有实现了的接口)
- 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制
线程池
创建 AnnotationAsyncExecutionInterceptor 时初始化线程池
线程池的创建是在创建 AnnotationAsyncExecutionInterceptor 对象时完成,代码如下:
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
复制代码
在其父类 AsyncExecutionAspectSupport 中完成具体线程池创建
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
在 getDefaultExecutor 方法中, 会先从 Spring 容器找 TaskExecutor 类型的线程池 Bean,如果找不到,会扩大范围找 Executor 类型的线程池 Bean,如果找不到,则返回 null。
这里是个延迟载入的操作,即只有当异步方法被调用时,才会触发 SingletonSupplier get 操作,从而触发 getBean 的逻辑,如果你在 debug 时出现没有正常走到断点的情况,可以关注下这个场景。
默认线程池 SimpleAsyncTaskExecutor
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
从这段逻辑看,如果从 Spring 容器中没有找到对应的线程池 Bean,那么就创建 SimpleAsyncTaskExecutor 作为默认的线程池。
This class also customizes the Executor by defining a new bean. Here, the method is named taskExecutor, since this is the specific method name for which Spring searches. In our case, we want to limit the number of concurrent threads to two and limit the size of the queue to 500. There are many more things you can tune. If you do not define an Executor bean, Spring creates a SimpleAsyncTaskExecutor and uses that.
方法执行任务的提交
基于前面的分析,方法执行任务的提交一定是发生在拦截到 @Async 注解时,也就是 AnnotationAsyncExecutionInterceptor 中;通过分析代码,在其父类 AsyncExecutionInterceptor
中,验证了分析。下面是部分核心逻辑:
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 1、拿到 Method
// 2、根据 Method 获取 executor
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 3、创建方法执行任务 task
Callable<Object> task = () -> {
// ...
};
// 4、提交 task
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
determineAsyncExecutor 中说明了, executor 是和方法对象绑定的,即每个方法都有一个自己的 executor;异步方法在第一次执行的时候创建自己的 executor,然后缓存到内存中。在 doSubmit 中,会根据 returnType 的类型进行相应的处理
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// ListenableFuture
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// Future
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// void
else {
executor.submit(task);
return null;
}
}
如何自定义线程池
SpringBoot 提供了 org.springframework.scheduling.annotation.AsyncConfigurer 接口让开发人员可以自定义线程池执行器;框架默认提供了一个空的实现类 AsyncConfigurerSupport,两个方法体内部都是空实现。这部分逻辑在 org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers体现:
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
// for this
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
AsyncConfigurer 在项目中只能有一个实现 Bean,如果超过一个,将会抛出 IllegalStateException 异常。
网友评论