一、@Async介绍
“异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序。
顾名思义,@Async是用来实现异步的。基于@Async的方法,称之为异步方法。这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。
假如我们有一个Task类,其中有三个任务需要异步执行,那么我们就可以将这些任务方法标上@Async注解,使其成为异步方法。代码如下:
@Component
public class AsyncTask {
private static Random random = new Random();
@Async
public void doTaskOne() throws Exception {
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
}
@Async
public void doTaskTwo() throws Exception {
System.out.println("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
}
@Async
public void doTaskThree() throws Exception {
System.out.println("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
}
}
为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
然后我们可以写一个单元测试进行测试一下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class ApplicationTests {
@Autowired
private Task task;
@Test
public void test() throws Exception {
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
}
}
这时你会发现,你的异步线程还没执行完毕 ,主线程就已经执行完了,导致你想要输出的语句没有在你主线程结束前及时输出等一系列问题,这时候就需要Future来协助你了。
二、Future介绍
Future提供了三种功能: - 判断任务是否完成; - 能够中断任务; - 能够获取任务执行结果
它声明这样的五个方法:
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
这样我们就能增加方法判断异步调用是否结束
@Async
public Future<String> doTaskOne() throws Exception {
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务一完成");
}
单元测试方法:
@Test
public void asyncTaskTest() throws Exception {
long start = System.currentTimeMillis();
Future<String> task1 = asyncTask.doTaskOne();
Future<String> task2 = asyncTask.doTaskTwo();
Future<String> task3 = asyncTask.doTaskThree();
// 三个任务都调用完成,退出循环等待
while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
Thread.sleep(1000);
}
long end = System.currentTimeMillis();
System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
结果:
开始做任务一
开始做任务二
开始做任务三
完成任务二,耗时:5352毫秒
完成任务一,耗时:7190毫秒
完成任务三,耗时:7525毫秒
任务全部完成,总耗时:8004毫秒
到这里 一个简单的线程异步调用就结束了,但是,还可以进行优化处理,就是增加线程池,因为这样就可以自己规划线程创建的数量,进行资源效率利用的最大化处理。下面的Demo就是一个很好的例子;
三、线程并发处理Demo
- 1、生成线程配置类及配置文件
package com.zfsoft.async;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 异步线程池配置
* @author jiaq
*
*/
public class OneWindowAsyncConfigurer implements AsyncConfigurer{
@Value("${zfsoft.threadPool.corePoolSize: 20}")
private int corePoolSize;
@Value("${zfsoft.threadPool.queueCapacity: 100}")
private int queueCapacity;
@Value("${zfsoft.threadPool.maxPoolSize: 80}")
private int maxPoolSize;
@Value("${zfsoft.threadPool.keepAliveSeconds: 100}")
private int keepAliveSeconds;
@Bean(name = "asyncOneWindowPool")
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数50:线程池创建时候初始化的线程数
executor.setCorePoolSize(corePoolSize);
//用来缓冲执行任务的队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
executor.setQueueCapacity(queueCapacity);
// 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程,默认为Integer.MAX_VALUE
executor.setMaxPoolSize(maxPoolSize);
//当超过了核心线程出之外的线程在空闲时间到达之后会被销毁,默认为60s
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程池名的前缀
executor.setThreadNamePrefix("SpringAsyncThread-");
//线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
asyncThreadConfig.properties
使用配置文件动态配置
## 多线程 异步资源全局配置
## 核心线程数
zfsoft.threadPool.corePoolSize=20
## 队列大小
zfsoft.threadPool.queueCapacity=100
## 线程池最大的线程数
zfsoft.threadPool.maxPoolSize=100
## 线程最大空闲时间
zfsoft.threadPool.keepAliveSeconds=100
- 2、@Async注解进行spring注入
asyncAnnotationBeanPostProcessor
<!-- @Value注解 -->
<bean id="propertyConfigurer" class = "org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer"></bean>
<!-- async注入spring -->
<bean id="asyncAnnotationBeanPostProcessor" class="org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor"/></bean>
<!-- 异步线程池配置 -->
<bean id="oneWindowAsyncConfigurer" class="com.zfsoft.async.OneWindowAsyncConfigurer"/></bean>
- 3、业务代码书写
/**
* 数据查询
* @param mv
* @param queryParam
* @param dirList
*/
public void messageQuery(ModelAndView mv,BlCaseZJStatisticsParam queryParam,List<String> dirList,ConcurrentHashMap<String, String> messageMap) throws Exception{
List<Future<String>> resultList = new ArrayList<>(15);
long xmlSt = System.currentTimeMillis();
// 异步调用
Future<String> stringFuture1 = blCaseZJStatisticsService.wssbQuantityCensus1(queryParam, dirList,messageMap);
resultList.add(stringFuture1);
Future<String> stringFuture2 = blCaseZJStatisticsService.blgcjsQuantityCensus2(queryParam,dirList,messageMap);
resultList.add(stringFuture2);
// 当执行成功移除map中数据,全部移除说明执行完成
Future<String> tempFuture = null;
Iterator<Future<String>> iterator = null;
while(true) {
// 避免死循环,设置超时时间
long xmlCur = System.currentTimeMillis();
if (xmlCur - xmlSt > 15000) {
System.out.println("统计超时!");
throw new RuntimeException("统计超时!");
}
if (resultList==null || resultList.isEmpty()) {
long xmlEt = System.currentTimeMillis();
System.out.println("统计总耗时:"+ (xmlEt - xmlSt) +"ms");
break;
}
iterator = resultList.iterator();
while (iterator.hasNext()) {
tempFuture = iterator.next();
if (tempFuture.isDone()) {
iterator.remove();
}
}
}
}
@Async("asyncOneWindowPool")
@Override
public Future<String> wssbQuantityCensus1( BlCaseZJStatisticsParam queryParam,List<String> dirList,Map<String, String> messageMap) throws Exception {
List<Map<String, String>> wssbQuantityCensus = BlCaseZJStatisticsDao.getWssbQuantityCensus(queryParam,dirList);
int size = 0;
if( wssbQuantityCensus!=null && wssbQuantityCensus.size()>0 ){
size = wssbQuantityCensus.size();
}
messageMap.put("wssbCount",String.valueOf(size));
return new AsyncResult<>(size+"");
}
@Async("asyncOneWindowPool")
@Override
public Future<String> blgcjsQuantityCensus2( BlCaseZJStatisticsParam queryParam,List<String> dirList,Map<String, String> messageMap) throws Exception {
List<Map<String,String>> projectList = BlCaseZJStatisticsDao.getByslgcjsxms(queryParam,dirList);
int size = 0;
if(projectList!=null&&projectList.size()>0){
size = projectList.size();
}
messageMap.put("blgcjsbCount",String.valueOf(size));
return new AsyncResult<>(size+"");
}
大功告成
四、可能遇到的问题
1、@Async所修饰的函数不要定义为static类型,这样异步调用不会生效。
2、不要在同一个类里去调用@Async所修饰异步的方法,也就是调用的方法和被异步调用的方法在一个类中,这样可能导致调用失败,因为在一个类中调用,优先会进行直接调用,除非你构造一个本类的代理类。
3、多线程中如果需要使用map,不要使用hashmap,切记,因为Hashmap是线程不安全的 ,查询数据会有问题,建议使用ConcurrentHashMap
网友评论