美文网首页多线程并发
Java中的阻塞队列

Java中的阻塞队列

作者: 肥兔子爱豆畜子 | 来源:发表于2021-07-05 16:56 被阅读0次

总览

虽然说在并发数据结构里头,后边出现了一些高性能的结构,比如jctools,disruptor之类的,但是java自带的阻塞队列是基础、用的也广泛,还是应该掌握。阻塞队列的阻塞,体现在两个地方:

  • 队列中没有数据的情况下,消费端线程会被自动阻塞挂起,有数据进入队列时被唤醒为可执行状态。
  • 队列已满的情况,生产端线程会被阻塞挂起,直到队列中有空位被唤醒。

再一个,java中自带的阻塞队列都是需要用锁的,ReentrantLock,放入也好、消费也好,用锁来保证同时只有一个线程能够放入或消费。

所有阻塞队列都是实现了interface BlockingQueue接口,所以关键的方法都在这个接口里边定义。

ArrayBlockingQueue

数组实现的有界队列,有公平和非公平两种模式,默认非公平,非公平比公平模式性能高。

BlockingQueue unfairQueue  = new ArrayBlockingQueue(100);
BlockingQueue fairQueue  = new ArrayBlockingQueue(100, true);

所谓公平访问模式,就是先到达队列而进入阻塞的线程,先放入或消费。

LinkedBlockingQueue

默认是链表实现的无界队列(Integer.MAX_VALUE),也可以指定长度成为有界队列。

BlockingQueue linkedQueue = new LinkedBlockingQueue(100); 
BlockingQueue linkedQueue = new LinkedBlockingQueue(); //Integer.MAX_VALUE

LinkedBlockingQueue在生产端和消费端是两把独立的锁,生产者和消费者线程可以并行处理队列中的数据,高并发下性能更好一些。

PriorityBlockingQueue

支持优先级的无界队列,内部是数组,可以根据情况进行自动扩容。具体使用的话,要么是需要队列里的元素实现Comparable接口进而Override接口的compareTo方法,或者是初始化PriorityBlockingQueue的时候在构造方法传入Comparator。这样PriorityBlockingQueue队列里的元素就是按照我们自定义的顺序从小到大排列了。

public class MyElement implements Comparable<MyElement>{
    
    private int value;

    @Override
    public int compareTo(MyElement o) {
        if(this.value > o.value)
            return 1;
        if(this.value < o.value)
            return -1;
        return 0;
    }

    public int getValue() {
        return value;
    }

    public void setValue(int value) {
        this.value = value;
    }

}
BlockingQueue<MyElement> priorityQueue = new PriorityBlockingQueue<>();
MyElement e1 = new MyElement();
e1.setValue(6);
MyElement e2 = new MyElement();
e2.setValue(3);
priorityQueue.offer(e1);
priorityQueue.offer(e2);
logger.info("priorityQueue size" + priorityQueue.size());
logger.info("priorityQueue" + priorityQueue.remove().getValue());
logger.info("priorityQueue" + priorityQueue.remove().getValue());
logger.info("priorityQueue size" + priorityQueue.size());

运行结果:

priorityQueue size2
priorityQueue3
priorityQueue6
priorityQueue size0

DelayQueue 延迟队列

DelayQueue队列中的元素必须实现Delayed接口,Override getDelay方法,创建元素时指定多久才能从队列里获取这个元素,DelayQueue基于PriorityQueue实现,所以也就是同时也要实现compareTo方法。所以一定要让快到期的元素优先级高、从队列里先取出来。DelayQueue一般有如下使用场景:

  • 缓存失效 ,可以将缓存信息存入缓存的同时将其失效时间放入DelayQueue,启动一个线程从队列中取元素,这样一旦从队列里取出了元素,那么对应的缓存也就该失效了,线程去缓存里清除对应的数据。
  • 定时任务 ,使用DelayQueue存当天将会执行的任务和执行时间,一旦从队列中取出元素就开始执行任务。
    代码模拟使用DelayQueue延迟执行2个任务:
public class MyDelayElement implements Delayed{
    
    private long startTime; //任务开始时间, ms
    private int value; //代表任务编号
    
    public MyDelayElement(int value, long startTime) {
        this.value = value;
        this.startTime = startTime;
    }
    
    public int getValue() {
        return value;
    }
    
    //按到期时间由小到大排序
    @Override
    public int compareTo(Delayed o) {
        if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
            return 1;
        if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
            return -1;
        return 0;
    }
    
    //任务还剩多长时间才能执行
    @Override
    public long getDelay(TimeUnit unit) {
        //long startTimeNano = TimeUnit.NANOSECONDS.convert(startTime, TimeUnit.MILLISECONDS);
        long remainTime = unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        
        return remainTime;
    }

}
BlockingQueue<MyDelayElement> delayQueue = new DelayQueue<>();
long currentTime = System.currentTimeMillis();
MyDelayElement e3 = new MyDelayElement(23, currentTime + 60*1000);
MyDelayElement e4 = new MyDelayElement(9, currentTime + 90*1000);
delayQueue.offer(e4);
delayQueue.offer(e3);
logger.info("两个元素放入DelayQueue延迟队列,分别延迟1分钟和1分半取出");
logger.info("delayQueue size:" + delayQueue.size());
Thread t1 = new Thread(new Runnable() {

    @Override
    public void run() {
        try {
            while(true) {
                MyDelayElement e = delayQueue.take();
                logger.info("取出元素,value=" + e.getValue());
                logger.info("delayQueue size:" + delayQueue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
});
t1.start();
t1.join();

运行结果

两个元素放入DelayQueue延迟队列,分别延迟1分钟和1分半取出
delayQueue size:2
取出元素,value=23
delayQueue size:1
取出元素,value=9
delayQueue size:0

SynchronousQueue

这是一个没有容量、也就是不存储元素的阻塞队列,每一个put必须等待一个take操作,否则添加不进元素。用于将一个线程中使用的数据传递给另一个线程使用。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue

链表构成的无界阻塞TransferQueue队列,比其他阻塞队列多了transfer方法和tryTransfer方法:

  • transfer方法 , 生产者传入元素时如果正好有消费者在等待消费元素,那么立刻将元素从生产者transfer传输给消费者,否则生产者将元素放入队列并且等待该元素被消费后再返回。
  • tryTransfer方法, 生产者传入元素时正好有消费者、则传输给消费者并返回true,否则立刻返回false。

LinkedBlockingDeque

双向链表阻塞队列

相关文章

  • 以LinkedBlockingQueue为例浅谈阻塞队列的实现

    目录 阻塞队列简介阻塞队列的定义Java中的阻塞队列 LinkedBlockingQueue单链表定义锁和等待队列...

  • Java阻塞队列四组API介绍

    Java阻塞队列四组API介绍 通过前面几篇文章的学习,我们已经知道了Java中的队列分为阻塞队列和非阻塞队列以及...

  • ArrayBlockingQueue源码解析

    在 Java8 中,提供了 7 个阻塞队列 ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列...

  • 19-阻塞队列之ArrayBlockingQueue

    Java中的阻塞队列 什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附...

  • Java中的阻塞队列

    Java中的阻塞队列 1. 什么是阻塞队列 阻塞队列是支持两个附加操作的队列。这两个附加操作就是阻塞式的插入和移除...

  • 一切尽在代码中:一文阐述队列的使用JAVA

    阻塞队列如下: 关于java.util 中提供的队列的方法们: 阻塞队列包括了非阻塞队列中的大部分方法,如上五个方...

  • 并发 - Java并发容器和框架

    Java中的阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞...

  • Java中的阻塞队列

    Java中的阻塞队列 ArrayBlockingQueeue,LinkedBlockingQueue,Priori...

  • Java阻塞队列

    本篇文章主要是介绍Java并发包中的几种阻塞队列和阻塞原理 背景 阻塞队列(BlockingQueue)是一个支持...

  • Java阻塞队列

    JAVA中的几种主要的阻塞队列 ArrayBlockingQueue: 基于数组实现的一个阻塞队列,在创建Arra...

网友评论

    本文标题:Java中的阻塞队列

    本文链接:https://www.haomeiwen.com/subject/ltbuultx.html