美文网首页
实战阻塞队列的生产与消费

实战阻塞队列的生产与消费

作者: 小民自愚 | 来源:发表于2021-03-11 11:04 被阅读0次
public class TestQueue {
    //初始化阻塞队列
    private static ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(100);

    static {
        init();
    }
    public static void init() {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                Integer task = null;
                try {
                    task = arrayBlockingQueue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                //多线程来消费
                Integer finalTask = task;
                fixedThreadPool.execute(() -> {
                    System.out.println("任务消费了:"+finalTask);
                });
            }
        }).start();
    }

    //生产任务
    public static void productTask(){
        int i=0;
        while (i<10000){
            if(arrayBlockingQueue.size()<100){
                arrayBlockingQueue.add(i);
                System.out.println("任务产生了:"+i);
                i++;
            }

        }
    }

    public static void main(String[] args) {
        productTask();
    }
}

结果:


image.png image.png
image.png
image.png

实际上,上述代码有一个漏洞,就是消费者使用的线程池使用的是无限队列,如果消费比较慢,可能会导致oom;

优化:
1.自定义线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new BlockRejectedExecutionHandler ());

2.重写拒绝策略;
//利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待

public class BlockRejectedExecutionHandler implements RejectedExecutionHandler {

    private final Logger log = LoggerFactory.getLogger(BlockRejectedExecutionHandler .class);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
//利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            log.warn("Interrupted!", e);
            Thread.currentThread().interrupt();
        }
    }

}

当然线程池的拒绝策略有很多,根据不同的业务场景选择合适的拒绝策略,以上拒绝策略适合刷数据时,不丢弃任务

相关文章

  • Android中的线程与线程池

    阻塞队列BlockingQueue 阻塞队列常用于生产者——消费者模型,生产者往阻塞队列插入数据,消费者往阻塞队列...

  • 实战阻塞队列的生产与消费

    结果: 实际上,上述代码有一个漏洞,就是消费者使用的线程池使用的是无限队列,如果消费比较慢,可能会导致oom; 优...

  • 什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?

    什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?

  • Java - SynchronousQueue学习使用

    SynchronousQueue叫同步队列,它与其他队列的不同之处是生产put与消费take都是阻塞操作,当生产的...

  • 阻塞队列

    1.阻塞队列定义阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。...

  • 生产者消费者

    生产者/消费者模式(阻塞队列) 生产者消费者模型的实现

  • java并发容器-BlockingQueue

    线程安全的阻塞队列,用来处理 生产者-消费者 问题。当队列容器满时,生产者线程被阻塞直到队列未满。当队列容器为空时...

  • 12:Java并发编程实战

    1:生产者和消费者模式下的并发案例 生产者和消费者之间通过阻塞队列来进行通信,阻塞队列相当于一个缓存区,平衡了生产...

  • ArrayBlockingQueue 学习

    生产者消费者模式最核心的部分是生产者与消费者之间的特殊容器,而阻塞队列是特殊容器最常见的实现。JDK中定义了阻塞队...

  • (十三)J.U.C-BlockingQueue

    BlockingQueue阻塞队列 主要应用场景:生产者消费者模型,是线程安全的 阻塞情况: 1、当队列满了进行入...

网友评论

      本文标题:实战阻塞队列的生产与消费

      本文链接:https://www.haomeiwen.com/subject/zyunqltx.html