BlockingQueue(阻塞队列)主要用在生产者/消费者的场景。
生产者/消费者图示
生产者生产入队,消费者消费出队,它作为一个提供中间过渡过程的工具,在一定程度上调整了系统的执行顺序与并发度,是一种典型的异步策略。
我第一次使用它是作为一个单独的日志收集工具。
在我们的设计中,业务接口并不应该因为日志的收集入库而阻塞其本身功能。所以我们使用线程池结合阻塞队列处理这一问题。
在业务接口中调用该日志采集方法,相当于发送一条通知过来而不是直接在当前线程中操作,“喂,我需要日志入库,信息放在这里了,你有空了自己处理一下”。然后这个通知会被安排在阻塞队列中排队,当线程有空闲时去处理它,线程池如果开启了3个线程同时工作,就可以同时处理三条通知。
如此,日志采集和业务接口就被划分开来互不影响了。当存在高并发时,该策略亦可以提升业务速度。
日志采集类
public class LogAgent {
/**
* 日志任务队列
*/
private static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);
/**
* 拒绝策略(当线程池满了之后,不使用线程池线程,而用当前线程去运行log方法。)
*/
private static RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
/**
* 日志任务线程池,初始化3个线程,最大100个任务,最大启动10个线程,线程缓存一分钟
*/
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 10, 1, TimeUnit.MINUTES, queue,handler);
private LogAgent() {
}
/**
* 日志收集方法
*
*/
public static void log(LogActiveProjectEnums project, LogActiveTypeEnums logType, String bussId,
LogActiveNameEnums activeName,String activeDesc, String activeMethod, String activeData) {
if (logType == null || project == null) {
return;
}
ActiveLogEntity activeLog = new ActiveLogEntity
.Builder(project,logType,bussId,activeName,activeDesc)
.activeData(activeData)
.activeMethod(activeMethod)
.createTime(new java.util.Date())
.createUserId("1")
.build();
log(activeLog);
}
/**
* 日志收集方法
*
* @param activelog
*/
public static void log(ActiveLogEntity activelog) {
Runnable collectThread = new LogCollectThread(activelog);
pool.execute(collectThread);
}
}
日志收集线程
这里选择继承Runnable接口是因为Runnable接口中无视异常,它不会抛出异常,不会阻塞过程。
@Slf4j
public class LogCollectThread implements Runnable {
private ActiveLogEntity activityLog;
/**
* 传入封装好的日志对象
*
* @param activelog
*/
public LogCollectThread(ActiveLogEntity activelog) {
this.activityLog = activelog;
}
@Override
public void run() {
ActiveLogManager activeLogManager = (ActiveLogManager) ApplicationContextHolder.getBean("activeLogManager");
int result = activeLogManager.create(activityLog);
if(1==result){
log.info("操作日志插入数据库成功:{}",activityLog);
}else{
log.error("操作日志插入数据库失败:{}",activityLog);
}
}
}
比如我想要入库登录操作日志,那么可以在Login接口中调用此条方法。
@PostMapping("/login")
@ApiOperation(value = "登录", notes = "用户登录接口")
@ApiResponses({
@ApiResponse(code = 80000,message = "登录失败",response = ApiResult.class),
@ApiResponse(code = 80001,message = "用户名或密码错误",response = ApiResult.class)
})
public ApiResult login(@RequestBody UserInfoDto userInfoDto) {
try {
String userName=userInfoDto.getUserName();
UserInfoVo userInfoVo = userMapper.selectByUserName(userName);
if(null == userInfoVo || !userInfoVo.getPassword().equals(userInfoDto.getPassword())){
return ApiResult.buildFail(AUTH_LOGIN_PARAM.getCode(), AUTH_LOGIN_PARAM.getDesc());
} else {
String tokenStr = JWTUtil.sign(userInfoVo);
//相当于存入token的时候,同时存入了用户的基本信息在redis里面,然后之后在redis没有过期的时候,可以直接去redis里面拿,不用解析token,也不用threadLocal。
//用户信息在有修改的时候要更新一次。
userService.addTokenToRedis(userInfoVo.getUserNum(),tokenStr);
userService.addUserInfoToRedis(userInfoVo.getUserNum(),userInfoVo);
return ApiResult.buildSuccessNormal("登录成功",tokenStr);
}
} catch (Exception e) {
log.info("登录失败,参数:{},异常:{}",userInfoDto,e);
return ApiResult.buildFail(AUTH_LOGIN.getCode(), AUTH_LOGIN.getDesc());
}finally {
LogAgent.log(LogActiveProjectEnums.GEMINI,LogActiveTypeEnums.SYSTEM,userMapper.selectByUserName(userInfoDto.getUserName()).getUserNum(),LogActiveNameEnums.LOG_LOGIN,"登录");
}
}
那么,当线程池出问题的话要怎么解决呢?
当一个任务通过execute(Runnable)方法欲添加到线程池时:
(1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
(2)如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
(3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
(4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
还有,当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
我们不使用默认的拒绝策略,因为它会抛出一个异常RejectedExecutionException,从而中断调用。我目前使用的是CallerRunsPolicy这个拒绝策略,可以在当前线程池满了之后使用业务接口的线程执行日志入库操作。可以保证日志信息的不丢失,但是或许会影响到业务接口的速度与执行结果(比如入库操作异常,会中断业务接口调用)。还有两种拒绝策略,分别是,放弃当前无法入队的线程,或者放弃队列中最古老的线程。下面参考资料中会详细介绍它们。
————————————————
//构造方法(指定拒绝策略)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//拥有默认参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
不同的拒绝策略有不同的适应场景,比如日志,其实并非精密数据的情况下丢掉一两篇记录也无所谓,可以选择的拒绝策略可以是,丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy),当然CallerRunsPolicy作为我选择的策略也是说得过去的,使用业务线程偶尔执行一下入库操作也并非不可,而且还能不丢失日志。
但是,如果是类似于同时上传多张图片等等这样的功能,如果我因为线程阻塞,排队太久却什么也不提示不返回,作为用户我可能要炸,这时候干脆设置超时,然后直接抛出异常,告诉用户,超出我能力限制了,我没有这个金刚钻,做不来瓷器活儿,反倒皆大欢喜。因为你不可以说强行丢掉他的资料,或者继续无止境地等待。
根据业务场景,系统规模,来设置一定的度和边界,是保障代码健壮性的关键。
当然,默认的四种拒绝策略只是常用的选择,最适合的其实还是自己写适应业务场景的拒绝策略,量身定做,设定各种情况下的针对性操作。
可以看下BlockingQueue的几种方法
BlockingQueue方法
我之前没明白的是 为啥阻塞队列能和线程池的拒绝策略一起用。因为阻塞队列的offer方法,加不进去会直接返回false 而put方法会一直阻塞 ,add则直接报错。
参考资料:
生产者/消费者模型
BlockingQueue介绍
拒绝策略详解
从一个故障看BlockingQueue的应用场景
网友评论