总览
虽然说在并发数据结构里头,后边出现了一些高性能的结构,比如jctools,disruptor之类的,但是java自带的阻塞队列是基础、用的也广泛,还是应该掌握。阻塞队列的阻塞,体现在两个地方:
- 队列中没有数据的情况下,消费端线程会被自动阻塞挂起,有数据进入队列时被唤醒为可执行状态。
- 队列已满的情况,生产端线程会被阻塞挂起,直到队列中有空位被唤醒。
再一个,java中自带的阻塞队列都是需要用锁的,ReentrantLock,放入也好、消费也好,用锁来保证同时只有一个线程能够放入或消费。
所有阻塞队列都是实现了interface BlockingQueue接口,所以关键的方法都在这个接口里边定义。
ArrayBlockingQueue
数组实现的有界队列,有公平和非公平两种模式,默认非公平,非公平比公平模式性能高。
BlockingQueue unfairQueue = new ArrayBlockingQueue(100);
BlockingQueue fairQueue = new ArrayBlockingQueue(100, true);
所谓公平访问模式,就是先到达队列而进入阻塞的线程,先放入或消费。
LinkedBlockingQueue
默认是链表实现的无界队列(Integer.MAX_VALUE),也可以指定长度成为有界队列。
BlockingQueue linkedQueue = new LinkedBlockingQueue(100);
BlockingQueue linkedQueue = new LinkedBlockingQueue(); //Integer.MAX_VALUE
LinkedBlockingQueue在生产端和消费端是两把独立的锁,生产者和消费者线程可以并行处理队列中的数据,高并发下性能更好一些。
PriorityBlockingQueue
支持优先级的无界队列,内部是数组,可以根据情况进行自动扩容。具体使用的话,要么是需要队列里的元素实现Comparable接口进而Override接口的compareTo方法,或者是初始化PriorityBlockingQueue的时候在构造方法传入Comparator。这样PriorityBlockingQueue队列里的元素就是按照我们自定义的顺序从小到大排列了。
public class MyElement implements Comparable<MyElement>{
private int value;
@Override
public int compareTo(MyElement o) {
if(this.value > o.value)
return 1;
if(this.value < o.value)
return -1;
return 0;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
BlockingQueue<MyElement> priorityQueue = new PriorityBlockingQueue<>();
MyElement e1 = new MyElement();
e1.setValue(6);
MyElement e2 = new MyElement();
e2.setValue(3);
priorityQueue.offer(e1);
priorityQueue.offer(e2);
logger.info("priorityQueue size" + priorityQueue.size());
logger.info("priorityQueue" + priorityQueue.remove().getValue());
logger.info("priorityQueue" + priorityQueue.remove().getValue());
logger.info("priorityQueue size" + priorityQueue.size());
运行结果:
priorityQueue size2
priorityQueue3
priorityQueue6
priorityQueue size0
DelayQueue 延迟队列
DelayQueue队列中的元素必须实现Delayed接口,Override getDelay方法,创建元素时指定多久才能从队列里获取这个元素,DelayQueue基于PriorityQueue实现,所以也就是同时也要实现compareTo方法。所以一定要让快到期的元素优先级高、从队列里先取出来。DelayQueue一般有如下使用场景:
- 缓存失效 ,可以将缓存信息存入缓存的同时将其失效时间放入DelayQueue,启动一个线程从队列中取元素,这样一旦从队列里取出了元素,那么对应的缓存也就该失效了,线程去缓存里清除对应的数据。
- 定时任务 ,使用DelayQueue存当天将会执行的任务和执行时间,一旦从队列中取出元素就开始执行任务。
代码模拟使用DelayQueue延迟执行2个任务:
public class MyDelayElement implements Delayed{
private long startTime; //任务开始时间, ms
private int value; //代表任务编号
public MyDelayElement(int value, long startTime) {
this.value = value;
this.startTime = startTime;
}
public int getValue() {
return value;
}
//按到期时间由小到大排序
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
return 0;
}
//任务还剩多长时间才能执行
@Override
public long getDelay(TimeUnit unit) {
//long startTimeNano = TimeUnit.NANOSECONDS.convert(startTime, TimeUnit.MILLISECONDS);
long remainTime = unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return remainTime;
}
}
BlockingQueue<MyDelayElement> delayQueue = new DelayQueue<>();
long currentTime = System.currentTimeMillis();
MyDelayElement e3 = new MyDelayElement(23, currentTime + 60*1000);
MyDelayElement e4 = new MyDelayElement(9, currentTime + 90*1000);
delayQueue.offer(e4);
delayQueue.offer(e3);
logger.info("两个元素放入DelayQueue延迟队列,分别延迟1分钟和1分半取出");
logger.info("delayQueue size:" + delayQueue.size());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true) {
MyDelayElement e = delayQueue.take();
logger.info("取出元素,value=" + e.getValue());
logger.info("delayQueue size:" + delayQueue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t1.join();
运行结果
两个元素放入DelayQueue延迟队列,分别延迟1分钟和1分半取出
delayQueue size:2
取出元素,value=23
delayQueue size:1
取出元素,value=9
delayQueue size:0
SynchronousQueue
这是一个没有容量、也就是不存储元素的阻塞队列,每一个put必须等待一个take操作,否则添加不进元素。用于将一个线程中使用的数据传递给另一个线程使用。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue
链表构成的无界阻塞TransferQueue队列,比其他阻塞队列多了transfer方法和tryTransfer方法:
- transfer方法 , 生产者传入元素时如果正好有消费者在等待消费元素,那么立刻将元素从生产者transfer传输给消费者,否则生产者将元素放入队列并且等待该元素被消费后再返回。
- tryTransfer方法, 生产者传入元素时正好有消费者、则传输给消费者并返回true,否则立刻返回false。
LinkedBlockingDeque
双向链表阻塞队列
网友评论