美文网首页
BlockingQueue学习与应用

BlockingQueue学习与应用

作者: hekirakuno | 来源:发表于2019-08-29 11:11 被阅读0次

    BlockingQueue(阻塞队列)主要用在生产者/消费者的场景。


    生产者/消费者图示

    生产者生产入队,消费者消费出队,它作为一个提供中间过渡过程的工具,在一定程度上调整了系统的执行顺序与并发度,是一种典型的异步策略。
    我第一次使用它是作为一个单独的日志收集工具。
    在我们的设计中,业务接口并不应该因为日志的收集入库而阻塞其本身功能。所以我们使用线程池结合阻塞队列处理这一问题。
    在业务接口中调用该日志采集方法,相当于发送一条通知过来而不是直接在当前线程中操作,“喂,我需要日志入库,信息放在这里了,你有空了自己处理一下”。然后这个通知会被安排在阻塞队列中排队,当线程有空闲时去处理它,线程池如果开启了3个线程同时工作,就可以同时处理三条通知。
    如此,日志采集和业务接口就被划分开来互不影响了。当存在高并发时,该策略亦可以提升业务速度。

    日志采集类

    public class LogAgent {
    
        /**
         * 日志任务队列
         */
        private static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);
    
        /**
         * 拒绝策略(当线程池满了之后,不使用线程池线程,而用当前线程去运行log方法。)
         */
        private static RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
    
        /**
         * 日志任务线程池,初始化3个线程,最大100个任务,最大启动10个线程,线程缓存一分钟
         */
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 10, 1, TimeUnit.MINUTES, queue,handler);
    
        private LogAgent() {
        }
    
        /**
         * 日志收集方法
         *
         */
            public static void log(LogActiveProjectEnums project, LogActiveTypeEnums logType, String bussId,
                LogActiveNameEnums activeName,String activeDesc, String activeMethod, String activeData) {
    
            if (logType == null || project == null) {
                return;
            }
            ActiveLogEntity activeLog = new ActiveLogEntity
                    .Builder(project,logType,bussId,activeName,activeDesc)
                    .activeData(activeData)
                    .activeMethod(activeMethod)
                    .createTime(new java.util.Date())
                    .createUserId("1")
                    .build();
            log(activeLog);
        }
    
        /**
         * 日志收集方法
         *
         * @param activelog
         */
        public static void log(ActiveLogEntity activelog) {
            Runnable collectThread = new LogCollectThread(activelog);
            pool.execute(collectThread);
        }
    
    }
    

    日志收集线程
    这里选择继承Runnable接口是因为Runnable接口中无视异常,它不会抛出异常,不会阻塞过程。

    @Slf4j
    public class LogCollectThread implements Runnable {
    
        private ActiveLogEntity activityLog;
        /**
         * 传入封装好的日志对象
         * 
         * @param activelog
         */
        public LogCollectThread(ActiveLogEntity activelog) {
            this.activityLog = activelog;
        }
    
        @Override
        public void run() {
            ActiveLogManager activeLogManager = (ActiveLogManager) ApplicationContextHolder.getBean("activeLogManager");
            int result = activeLogManager.create(activityLog);
            if(1==result){
                log.info("操作日志插入数据库成功:{}",activityLog);
            }else{
                log.error("操作日志插入数据库失败:{}",activityLog);
            }
        }
    }
    

    比如我想要入库登录操作日志,那么可以在Login接口中调用此条方法。

    @PostMapping("/login")
        @ApiOperation(value = "登录", notes = "用户登录接口")
        @ApiResponses({
                @ApiResponse(code = 80000,message = "登录失败",response = ApiResult.class),
                @ApiResponse(code = 80001,message = "用户名或密码错误",response = ApiResult.class)
        })
        public ApiResult login(@RequestBody UserInfoDto userInfoDto) {
            try {
                String userName=userInfoDto.getUserName();
                UserInfoVo userInfoVo = userMapper.selectByUserName(userName);
                if(null == userInfoVo || !userInfoVo.getPassword().equals(userInfoDto.getPassword())){
                    return ApiResult.buildFail(AUTH_LOGIN_PARAM.getCode(), AUTH_LOGIN_PARAM.getDesc());
                } else {
                    String tokenStr = JWTUtil.sign(userInfoVo);
                    //相当于存入token的时候,同时存入了用户的基本信息在redis里面,然后之后在redis没有过期的时候,可以直接去redis里面拿,不用解析token,也不用threadLocal。
                    //用户信息在有修改的时候要更新一次。
                    userService.addTokenToRedis(userInfoVo.getUserNum(),tokenStr);
                    userService.addUserInfoToRedis(userInfoVo.getUserNum(),userInfoVo);
                    return ApiResult.buildSuccessNormal("登录成功",tokenStr);
                }
            } catch (Exception e) {
                log.info("登录失败,参数:{},异常:{}",userInfoDto,e);
                return ApiResult.buildFail(AUTH_LOGIN.getCode(), AUTH_LOGIN.getDesc());
            }finally {
                LogAgent.log(LogActiveProjectEnums.GEMINI,LogActiveTypeEnums.SYSTEM,userMapper.selectByUserName(userInfoDto.getUserName()).getUserNum(),LogActiveNameEnums.LOG_LOGIN,"登录");
            }
        }
    

    那么,当线程池出问题的话要怎么解决呢?

    当一个任务通过execute(Runnable)方法欲添加到线程池时:
    (1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    (2)如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
    (3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
    (4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    还有,当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    我们不使用默认的拒绝策略,因为它会抛出一个异常RejectedExecutionException,从而中断调用。我目前使用的是CallerRunsPolicy这个拒绝策略,可以在当前线程池满了之后使用业务接口的线程执行日志入库操作。可以保证日志信息的不丢失,但是或许会影响到业务接口的速度与执行结果(比如入库操作异常,会中断业务接口调用)。还有两种拒绝策略,分别是,放弃当前无法入队的线程,或者放弃队列中最古老的线程。下面参考资料中会详细介绍它们。
    ————————————————

    //构造方法(指定拒绝策略)
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    //拥有默认参数的构造方法
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    不同的拒绝策略有不同的适应场景,比如日志,其实并非精密数据的情况下丢掉一两篇记录也无所谓,可以选择的拒绝策略可以是,丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy),当然CallerRunsPolicy作为我选择的策略也是说得过去的,使用业务线程偶尔执行一下入库操作也并非不可,而且还能不丢失日志。
    但是,如果是类似于同时上传多张图片等等这样的功能,如果我因为线程阻塞,排队太久却什么也不提示不返回,作为用户我可能要炸,这时候干脆设置超时,然后直接抛出异常,告诉用户,超出我能力限制了,我没有这个金刚钻,做不来瓷器活儿,反倒皆大欢喜。因为你不可以说强行丢掉他的资料,或者继续无止境地等待。
    根据业务场景,系统规模,来设置一定的度和边界,是保障代码健壮性的关键。

    当然,默认的四种拒绝策略只是常用的选择,最适合的其实还是自己写适应业务场景的拒绝策略,量身定做,设定各种情况下的针对性操作。

    可以看下BlockingQueue的几种方法


    BlockingQueue方法

    我之前没明白的是 为啥阻塞队列能和线程池的拒绝策略一起用。因为阻塞队列的offer方法,加不进去会直接返回false 而put方法会一直阻塞 ,add则直接报错。

    参考资料:
    生产者/消费者模型
    BlockingQueue介绍
    拒绝策略详解
    从一个故障看BlockingQueue的应用场景

    相关文章

      网友评论

          本文标题:BlockingQueue学习与应用

          本文链接:https://www.haomeiwen.com/subject/bsubectx.html