美文网首页
实战阻塞队列的生产与消费

实战阻塞队列的生产与消费

作者: b335eb9201c3 | 来源:发表于2021-03-11 11:04 被阅读0次
    public class TestQueue {
        //初始化阻塞队列
        private static ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(100);
    
        static {
            init();
        }
        public static void init() {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
            new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    Integer task = null;
                    try {
                        task = arrayBlockingQueue.take();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    //多线程来消费
                    Integer finalTask = task;
                    fixedThreadPool.execute(() -> {
                        System.out.println("任务消费了:"+finalTask);
                    });
                }
            }).start();
        }
    
        //生产任务
        public static void productTask(){
            int i=0;
            while (i<10000){
                if(arrayBlockingQueue.size()<100){
                    arrayBlockingQueue.add(i);
                    System.out.println("任务产生了:"+i);
                    i++;
                }
    
            }
        }
    
        public static void main(String[] args) {
            productTask();
        }
    }
    

    结果:


    image.png image.png
    image.png
    image.png

    实际上,上述代码有一个漏洞,就是消费者使用的线程池使用的是无限队列,如果消费比较慢,可能会导致oom;

    优化:
    1.自定义线程池

    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, 
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new BlockRejectedExecutionHandler ());
    

    2.重写拒绝策略;
    //利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待

    public class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
    
        private final Logger log = LoggerFactory.getLogger(BlockRejectedExecutionHandler .class);
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
    //利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                log.warn("Interrupted!", e);
                Thread.currentThread().interrupt();
            }
        }
    
    }
    

    当然线程池的拒绝策略有很多,根据不同的业务场景选择合适的拒绝策略,以上拒绝策略适合刷数据时,不丢弃任务

    相关文章

      网友评论

          本文标题:实战阻塞队列的生产与消费

          本文链接:https://www.haomeiwen.com/subject/zyunqltx.html