一、起因
众所周知,toncat线程数量有限,在接收到http请求的时候,为了提高系统的吞吐量,很多时候我们都会选择使用异步来进行处理,从而使得不阻塞主线程。
二、异步处理的两种方式
1. callabel调用
@Slf4j
@RestController
@RequestMapping("/async")
public class AsyncController {
/**
* <一句话功能描述>:异步处理服务
* <功能详细描述>:
* @Param:
* @Return: java.util.concurrent.Callable
*/
@GetMapping("/order")
public Callable order(){
log.info("主线程开始");
Callable<String> callable = new Callable() {
@Override
public String call() throws Exception {
log.info("副线程开始");
Thread.sleep(1000);
log.info("副线程结束");
return "success";
}
};
log.info("主线程结束");
return callable;
}
}
2. DeferredResult,多个系统间调用
- 创建DeferredResultHold保存DeferredResult
@Component
@Data
public class DeferredResultHold {
private Map<String,DeferredResult<String>>map = new HashMap<>();
}
- 自定义实体类,模拟消息中间件
@Slf4j
@Component
public class OrderQueue {
private String placeOrder;
private String completeOrder;
public String getPlaceOrder() {
return placeOrder;
}
public String getCompleteOrder() {
return completeOrder;
}
public void setPlaceOrder(String placeOrder) throws Exception {
new Thread(() -> {
log.info("下单开始:{}", placeOrder);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.completeOrder = placeOrder;
log.info("下单成功:{}", placeOrder);
}).start();
}
public void setCompleteOrder(String completeOrder) {
this.completeOrder = completeOrder;
}
}
- 监听类
@Component
public class DeferredResultListen implements CommandLineRunner {
@Autowired
private OrderQueue orderQueue;
@Autowired
private DeferredResultHold deferredResultHold;
@Override
public void run(String... args) throws Exception {
new Thread(() -> {
while (true) {
if (StringUtils.isNotBlank(orderQueue.getCompleteOrder())) {
//调用setResult通知已经完成
deferredResultHold.getMap().get(orderQueue.getCompleteOrder())
.setResult("do success");
orderQueue.setCompleteOrder(null);
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
- api访问
@Slf4j
@RestController
@RequestMapping("/deferredResult")
public class OrderController {
@Autowired
private OrderQueue orderQueue;
@Autowired
private DeferredResultHold deferredResultHold;
@GetMapping("/order")
public DeferredResult<String> order() throws Exception {
log.info("主线程开始");
String orderNumber = RandomStringUtils.randomNumeric(8);
orderQueue.setPlaceOrder(orderNumber);
DeferredResult<String> deferredResult = new DeferredResult<>();
deferredResultHold.getMap().put(orderNumber, deferredResult);
log.info("主线程开始");
return deferredResult;
}
}
3. 全局配置,自定义连接池,不使用默认配置的连接池
@Configuration
public class WebConfig {
/**
* <一句话功能描述>:异步处理请求的配置
* <功能详细描述>:
*
* @Param:
* @Return: org.springframework.web.servlet.config.annotation.WebMvcConfigurer
*/
@Bean
public WebMvcConfigurer webMvcConfigurer2() {
return new WebMvcConfigurerAdapter() {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
//自定义配置连接池
configurer.setTaskExecutor(threadPoolTaskExecutor());
}
};
}
/**
* <一句话功能描述>:创建线程池
* <功能详细描述>:
* @Param:
* @Return: org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
//核心连接数
t.setCorePoolSize(10);
//最大连接数
t.setMaxPoolSize(50);
//线程名前缀(可自定义,标识更清楚)
t.setThreadNamePrefix("asyn");
return t;
}
}
网友评论