美文网首页
BlockingQueue学习与应用

BlockingQueue学习与应用

作者: hekirakuno | 来源:发表于2019-08-29 11:11 被阅读0次

BlockingQueue(阻塞队列)主要用在生产者/消费者的场景。


生产者/消费者图示

生产者生产入队,消费者消费出队,它作为一个提供中间过渡过程的工具,在一定程度上调整了系统的执行顺序与并发度,是一种典型的异步策略。
我第一次使用它是作为一个单独的日志收集工具。
在我们的设计中,业务接口并不应该因为日志的收集入库而阻塞其本身功能。所以我们使用线程池结合阻塞队列处理这一问题。
在业务接口中调用该日志采集方法,相当于发送一条通知过来而不是直接在当前线程中操作,“喂,我需要日志入库,信息放在这里了,你有空了自己处理一下”。然后这个通知会被安排在阻塞队列中排队,当线程有空闲时去处理它,线程池如果开启了3个线程同时工作,就可以同时处理三条通知。
如此,日志采集和业务接口就被划分开来互不影响了。当存在高并发时,该策略亦可以提升业务速度。

日志采集类

public class LogAgent {

    /**
     * 日志任务队列
     */
    private static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);

    /**
     * 拒绝策略(当线程池满了之后,不使用线程池线程,而用当前线程去运行log方法。)
     */
    private static RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    /**
     * 日志任务线程池,初始化3个线程,最大100个任务,最大启动10个线程,线程缓存一分钟
     */
    private static ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 10, 1, TimeUnit.MINUTES, queue,handler);

    private LogAgent() {
    }

    /**
     * 日志收集方法
     *
     */
        public static void log(LogActiveProjectEnums project, LogActiveTypeEnums logType, String bussId,
            LogActiveNameEnums activeName,String activeDesc, String activeMethod, String activeData) {

        if (logType == null || project == null) {
            return;
        }
        ActiveLogEntity activeLog = new ActiveLogEntity
                .Builder(project,logType,bussId,activeName,activeDesc)
                .activeData(activeData)
                .activeMethod(activeMethod)
                .createTime(new java.util.Date())
                .createUserId("1")
                .build();
        log(activeLog);
    }

    /**
     * 日志收集方法
     *
     * @param activelog
     */
    public static void log(ActiveLogEntity activelog) {
        Runnable collectThread = new LogCollectThread(activelog);
        pool.execute(collectThread);
    }

}

日志收集线程
这里选择继承Runnable接口是因为Runnable接口中无视异常,它不会抛出异常,不会阻塞过程。

@Slf4j
public class LogCollectThread implements Runnable {

    private ActiveLogEntity activityLog;
    /**
     * 传入封装好的日志对象
     * 
     * @param activelog
     */
    public LogCollectThread(ActiveLogEntity activelog) {
        this.activityLog = activelog;
    }

    @Override
    public void run() {
        ActiveLogManager activeLogManager = (ActiveLogManager) ApplicationContextHolder.getBean("activeLogManager");
        int result = activeLogManager.create(activityLog);
        if(1==result){
            log.info("操作日志插入数据库成功:{}",activityLog);
        }else{
            log.error("操作日志插入数据库失败:{}",activityLog);
        }
    }
}

比如我想要入库登录操作日志,那么可以在Login接口中调用此条方法。

@PostMapping("/login")
    @ApiOperation(value = "登录", notes = "用户登录接口")
    @ApiResponses({
            @ApiResponse(code = 80000,message = "登录失败",response = ApiResult.class),
            @ApiResponse(code = 80001,message = "用户名或密码错误",response = ApiResult.class)
    })
    public ApiResult login(@RequestBody UserInfoDto userInfoDto) {
        try {
            String userName=userInfoDto.getUserName();
            UserInfoVo userInfoVo = userMapper.selectByUserName(userName);
            if(null == userInfoVo || !userInfoVo.getPassword().equals(userInfoDto.getPassword())){
                return ApiResult.buildFail(AUTH_LOGIN_PARAM.getCode(), AUTH_LOGIN_PARAM.getDesc());
            } else {
                String tokenStr = JWTUtil.sign(userInfoVo);
                //相当于存入token的时候,同时存入了用户的基本信息在redis里面,然后之后在redis没有过期的时候,可以直接去redis里面拿,不用解析token,也不用threadLocal。
                //用户信息在有修改的时候要更新一次。
                userService.addTokenToRedis(userInfoVo.getUserNum(),tokenStr);
                userService.addUserInfoToRedis(userInfoVo.getUserNum(),userInfoVo);
                return ApiResult.buildSuccessNormal("登录成功",tokenStr);
            }
        } catch (Exception e) {
            log.info("登录失败,参数:{},异常:{}",userInfoDto,e);
            return ApiResult.buildFail(AUTH_LOGIN.getCode(), AUTH_LOGIN.getDesc());
        }finally {
            LogAgent.log(LogActiveProjectEnums.GEMINI,LogActiveTypeEnums.SYSTEM,userMapper.selectByUserName(userInfoDto.getUserName()).getUserNum(),LogActiveNameEnums.LOG_LOGIN,"登录");
        }
    }

那么,当线程池出问题的话要怎么解决呢?

当一个任务通过execute(Runnable)方法欲添加到线程池时:
(1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
(2)如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
(3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
(4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
还有,当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

我们不使用默认的拒绝策略,因为它会抛出一个异常RejectedExecutionException,从而中断调用。我目前使用的是CallerRunsPolicy这个拒绝策略,可以在当前线程池满了之后使用业务接口的线程执行日志入库操作。可以保证日志信息的不丢失,但是或许会影响到业务接口的速度与执行结果(比如入库操作异常,会中断业务接口调用)。还有两种拒绝策略,分别是,放弃当前无法入队的线程,或者放弃队列中最古老的线程。下面参考资料中会详细介绍它们。
————————————————

//构造方法(指定拒绝策略)
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
//拥有默认参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

不同的拒绝策略有不同的适应场景,比如日志,其实并非精密数据的情况下丢掉一两篇记录也无所谓,可以选择的拒绝策略可以是,丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy),当然CallerRunsPolicy作为我选择的策略也是说得过去的,使用业务线程偶尔执行一下入库操作也并非不可,而且还能不丢失日志。
但是,如果是类似于同时上传多张图片等等这样的功能,如果我因为线程阻塞,排队太久却什么也不提示不返回,作为用户我可能要炸,这时候干脆设置超时,然后直接抛出异常,告诉用户,超出我能力限制了,我没有这个金刚钻,做不来瓷器活儿,反倒皆大欢喜。因为你不可以说强行丢掉他的资料,或者继续无止境地等待。
根据业务场景,系统规模,来设置一定的度和边界,是保障代码健壮性的关键。

当然,默认的四种拒绝策略只是常用的选择,最适合的其实还是自己写适应业务场景的拒绝策略,量身定做,设定各种情况下的针对性操作。

可以看下BlockingQueue的几种方法


BlockingQueue方法

我之前没明白的是 为啥阻塞队列能和线程池的拒绝策略一起用。因为阻塞队列的offer方法,加不进去会直接返回false 而put方法会一直阻塞 ,add则直接报错。

参考资料:
生产者/消费者模型
BlockingQueue介绍
拒绝策略详解
从一个故障看BlockingQueue的应用场景

相关文章

  • BlockingQueue学习与应用

    BlockingQueue(阻塞队列)主要用在生产者/消费者的场景。 生产者生产入队,消费者消费出队,它作为一个提...

  • Java - ArrayBlockingQueue设计原理

    ArrayBlockingQueue与LinkedBlockingQueue都是BlockingQueue的实现,...

  • BlockingQueue的概述

    声明:仅仅对比介绍下BlockingQueue和其各个子接口与实现类 基接口BlockingQueue 概述...

  • Java - BlockingQueue学习

    BlockingQueue是阻塞队列,继承Queue,在Queue的基础上添加了阻塞接口,实现阻塞功能。 Bloc...

  • BlockingQueue和CountDownLatch

    以下内容整理自互联网,仅用于个人学习 BlockingQueue BlockingQueue接口定义了一种阻塞的F...

  • 阻塞队列 BlockingQueue

    阻塞队列 BlockingQueue BlockingQueue用法 BlockingQueue 通常用于一个线...

  • 生产者与消费者

    使用BlockingQueue模拟生产者与消费者

  • Base

    Java基础 [ArrayList、LinkedList、Vector区别] [BlockingQueue与Cou...

  • BlockingQueue

    BlockingQueue 什么是BlockingQueue? A Queue that additionally...

  • BlockingQueue

    了解 BlockingQueue: BlockingQueue 位于 java.util.concurrent ...

网友评论

      本文标题:BlockingQueue学习与应用

      本文链接:https://www.haomeiwen.com/subject/bsubectx.html