public class TestQueue {
//初始化阻塞队列
private static ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(100);
static {
init();
}
public static void init() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
Integer task = null;
try {
task = arrayBlockingQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
//多线程来消费
Integer finalTask = task;
fixedThreadPool.execute(() -> {
System.out.println("任务消费了:"+finalTask);
});
}
}).start();
}
//生产任务
public static void productTask(){
int i=0;
while (i<10000){
if(arrayBlockingQueue.size()<100){
arrayBlockingQueue.add(i);
System.out.println("任务产生了:"+i);
i++;
}
}
}
public static void main(String[] args) {
productTask();
}
}
结果:
image.png image.png
image.png
image.png
实际上,上述代码有一个漏洞,就是消费者使用的线程池使用的是无限队列,如果消费比较慢,可能会导致oom;
优化:
1.自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new BlockRejectedExecutionHandler ());
2.重写拒绝策略;
//利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待
public class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
private final Logger log = LoggerFactory.getLogger(BlockRejectedExecutionHandler .class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//利用阻塞对立阻塞的属性,往队列丢任务,队列已满,不丢弃任务,等待
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.warn("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
当然线程池的拒绝策略有很多,根据不同的业务场景选择合适的拒绝策略,以上拒绝策略适合刷数据时,不丢弃任务
网友评论