核心关键点:
@Async("npmExecutor") 异步调用的返回结果只能是void或者Future<T>,如果执行Future.get()方法,等待异步调用结果,则主线程会阻塞,知道拿到结果后,才会继续执行
@Async源码注释中可以看到具体用法解释:
public @interface Async {
/**
* A qualifier value for the specified asynchronous operation(s).
指定为自己定义的线程池npmExecutor,否则默认最大线程数不是自己设定的50个
* <p>May be used to determine the target executor to be used when executing
* the asynchronous operation(s), matching the qualifier value (or the bean
* name) of a specific {@link java.util.concurrent.Executor Executor} or
* {@link org.springframework.core.task.TaskExecutor TaskExecutor}
* bean definition.
* <p>When specified on a class-level {@code @Async} annotation, indicates that the
* given executor should be used for all methods within the class. Method-level use
* of {@code Async#value} always overrides any value set at the class level.
* @since 3.1.2
*/
String value() default "";
定时任务执行类
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@EnableScheduling
public class ScheduledService {
@Autowired
@Qualifier("npmExecutor")
private ThreadPoolExecutor executor;
@Autowired
private TestFuture testFuture;
@Autowired
private TestMulti testMulti;
@Scheduled(cron = "*/3 * * * * ?")
@Async("npmExecutor")
public void testMulti() {
log.info("testMulti 执行了={}",DateUtils.formatDate(new Date()));
log.info("testMulti 激活线程数={},任务数={},已完成数={},队列大小={}",
executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount(),executor.getQueue().size());
try {
for (int i =0;i<2;i++){
Future<String> submit = executor.submit(testFuture);
log.info("执行结果={}",submit.get());
log.info("执行中 激活线程数={},任务数={},已完成数={}",executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount());
}
} catch (Exception e) {
log.error("alarm 获取数据失败={}", e);
}
}
}
配置线程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class NpmThreadPoolConfig {
@Bean("npmExecutor")
public ThreadPoolExecutor getExecutor() {
ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(30, 50, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(1));
taskExecutor.setThreadFactory(new NamedThreadFactory("npm"));
taskExecutor.setRejectedExecutionHandler(new MyRejectHandle());
return taskExecutor;
}
}
自定义拒绝策略
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
//@Configuration
@Slf4j
public class MyRejectHandle implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("you task is rejected." + executor.toString());
}
}
任务类
@Slf4j
@Service
public class TestFuture implements Callable<String> {
AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public String call() throws Exception {
Thread.sleep(30000);
return "TestFuture:"+atomicInteger.incrementAndGet()+":"+Thread.currentThread().getName();
}
}
网友评论