Queue:基本上,队列就是一个先进先出(FIFO)的数据结构
Queue接口与List、Set同一级别,都是集成了Collection接口。
Queue分为:阻塞队列实现和非阻塞队列实现,线程安全和非线程安全之分。
一、非阻塞队列(Queue)
不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue
-
PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。
-
ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。
![](https://img.haomeiwen.com/i8938636/b0d03efc0192946a.png)
当往队列中追加10个对象,耗时对比
PriorityQueue total cost=26072ns , each cost=2607ns
ConcurrentLinkedQueue total cost=39504ns , each cost=3950ns
当往队列里面追加10000个数据时,耗时对比
PriorityQueue total cost=1185094ns , each cost=118ns
ConcurrentLinkedQueue total cost=1157837ns , each cost=115ns
对比发现队列在一次性追加越多数量的数据似乎速度越快
所有队列的性能对比(代码最下面)
PriorityQueue add total cost=2263924ns , each cost=226ns
ConcurrentLinkedQueue add total cost=1170872ns , each cost=117ns
LinkedBlockingQueue add total cost=3611376ns , each cost=361ns
ArrayBlockingQueue add total cost=1749199ns , each cost=174ns
PriorityBlockingQueue add total cost=2289602ns , each cost=228ns
DelayQueue add total cost=2733222ns , each cost=273ns
PriorityQueue remove total cost=1747223ns , each cost=174ns
ConcurrentLinkedQueue remove total cost=1502304ns , each cost=150ns
LinkedBlockingQueue remove total cost=1677698ns , each cost=167ns
ArrayBlockingQueue remove total cost=1133345ns , each cost=113ns
PriorityBlockingQueue remove total cost=1420928ns , each cost=142ns
DelayQueue remove total cost=1947109ns , each cost=194ns
二、阻塞队列(BlockingQueue)
不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。
五个队列所提供的各有不同:
- ArrayBlockingQueue :一个由数组支持的有界队列。
- LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
- PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。
- DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
- SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
** element **返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true 如果队列已满,则返回false
poll 移除并返问队列头部的元素 如果队列为空,则返回null
peek 返回队列头部的元素 如果队列为空,则返回null
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞
** remove、element、offer 、poll、peek 其实是属于Queue接口 **
** 阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。**
** 注意:poll和peek方法出错进返回null。因此,向队列中插入null值是不合法的 **
** 最后,我们有阻塞操作put和take。put方法在队列满时阻塞,take方法在队列空时阻塞 **
阻塞队列说明
LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
![](https://img.haomeiwen.com/i8938636/0e2e8f8260bbd4ed.png)
---代码:开始 简单的数据操作对比---
/**
* @Description: 非阻塞队列对比,1万次追加数据,1万次移除数据 1万次生产消费同时处理效率对比
* @Author: [Bill] on [2019/8/1 15:08]
*/
public class QueueInterTest {
public static void main(String[] args) {
//执行1万次追加数据
int count = 10000;
Queue priorityQueue = new PriorityQueue();//有序列表
Queue concurrentLinkedQueue = new ConcurrentLinkedQueue();//基于链接节点,线程安全
Queue linkedBlockingQueue = new LinkedBlockingQueue();
Queue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要选择是否需要公平性,所以需要加入的对象实现Comparable比较接口
Queue priorityBlockingQueue = new PriorityBlockingQueue();
Queue delayQueue = new DelayQueue();//只有在延迟期满时才能从中提取元素,由于需要实现延迟需要加入的对象实现Delayed接口
List<Queue> queues = Arrays.asList(priorityQueue, concurrentLinkedQueue, linkedBlockingQueue,arrayBlockingQueue, priorityBlockingQueue, delayQueue);
queues.forEach(p -> testAddTemplate(p));
//溢出1万次获取数据
queues.forEach(p -> testRemoveTemplate(p));
}
private static void testAddTemplate(Queue priorityQueue) {
//进行GC回收
JvmUtils.restoreJVM();
//进行测试
int count = 10000;
//创建对象
TestBean testBean = new TestBean();
long start = System.nanoTime();
for (int i = 0; i < count; i++) {
priorityQueue.add(testBean);
}
long nscost = (System.nanoTime()) - start;
System.out.println(priorityQueue.getClass().getSimpleName() + " add total cost=" + nscost + "ns , each cost="
+ (nscost / count) + "ns");
JvmUtils.restoreJVM();
}
private static void testRemoveTemplate(Queue priorityQueue) {
//进行GC回收
JvmUtils.restoreJVM();
//进行测试
long start = System.nanoTime();
int size = priorityQueue.size();
for (int i = 0; i < size; i++) {
priorityQueue.remove();
}
long nscost = (System.nanoTime()) - start;
System.out.println(priorityQueue.getClass().getSimpleName() + " remove total cost=" + nscost + "ns , each cost="
+ (nscost / size) + "ns");
JvmUtils.restoreJVM();
}
---代码:结束 简单的数据操作对比---
阻塞队列使用对比 1万次生产消费同时处理效率对比。
对比代码
package com.evolution.upgrade.collection.queue;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import com.evolution.upgrade.Serialization.TestBean;
import com.evolution.upgrade.collection.queue.impl.BlockingQueueInterImpl;
import com.evolution.upgrade.utils.JvmUtils;
/**
* @Description: 阻塞队列使用对比 1万次生产消费同时处理效率对比。
* @Author: [Bill] on [2019/8/1 17:30]
*/
public class BlockingQueueInterTest {
private static final int count = 10000;
public static void main(String[] args) {
BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要选择是否需要公平性,所以需要加入的对象实现Comparable比较接口
BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
BlockingQueue delayQueue = new DelayQueue();//只有在延迟期满时才能从中提取元素,由于需要实现延迟需要加入的对象实现Delayed接口
List<BlockingQueue> queues = Arrays.asList(linkedBlockingQueue, linkedBlockingQueue, arrayBlockingQueue,
priorityBlockingQueue, delayQueue);
queues.forEach(p -> testTemplate(p));
System.out.println("==========================");
queues.forEach(p -> testThreadTemplate(p));
}
private static void testThreadTemplate(BlockingQueue blockingQueue) {
BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
//预处理
for (int i = 0; i < 10; i++) {
try {
blockingQueueInter.produce(new TestBean());
blockingQueueInter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//进行GC回收
JvmUtils.restoreJVM();
//多线程,生产消费同时进行
Producer producer = new Producer(blockingQueueInter, new TestBean());
CountDownLatch countDownLatch = new CountDownLatch(count - 1);
Consumer consumer = new Consumer(blockingQueueInter, countDownLatch);
ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer);
service.submit(consumer);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.keepRunning = false;
service.shutdown();
//进行GC回收
JvmUtils.restoreJVM();
}
private static void testTemplate(BlockingQueue blockingQueue) {
try {
//进行GC回收
JvmUtils.restoreJVM();
BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
//创建对象
TestBean testBean = new TestBean();
long start = System.nanoTime();
for (int i = 0; i < count; i++) {
blockingQueueInter.produce(testBean);
}
for (int i = 0; i < count; i++) {
blockingQueueInter.consume();
}
long nscost = (System.nanoTime()) - start;
System.out.println(blockingQueue.getClass().getSimpleName() + " add total cost=" + nscost
+ "ns , each cost=" + (nscost / count) + "ns");
JvmUtils.restoreJVM();
} catch (Exception ex) {
}
}
static class Producer implements Runnable {
private final BlockingQueueInter blockingQueueInter;
private final TestBean testBean;
public Producer(BlockingQueueInter blockingQueueInter, TestBean testBean) {
this.blockingQueueInter = blockingQueueInter;
this.testBean = testBean;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
try {
blockingQueueInter.produce(testBean);
} catch (InterruptedException e) {
}
}
}
}
static class Consumer implements Runnable {
private final BlockingQueueInter blockingQueueInter;
private final CountDownLatch countDownLatch;
AtomicInteger cou = new AtomicInteger(0);
volatile boolean keepRunning = true;
long start = System.nanoTime();
public Consumer(BlockingQueueInter blockingQueueInter, CountDownLatch countDownLatch) {
this.blockingQueueInter = blockingQueueInter;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
while (keepRunning) {
if (cou.intValue() + 1 >= count) {
long nscost = (System.nanoTime()) - start;
System.out.println(blockingQueueInter.getName() + " add total cost=" + nscost
+ "ns , each cost=" + (nscost / count) + "ns");
break;
}
Object consume = blockingQueueInter.consume();
if (Objects.nonNull(consume)) {
cou.getAndIncrement();
countDownLatch.countDown();
}
}
} catch (InterruptedException e) {
} finally {
countDownLatch.countDown();
}
}
}
}
结果
LinkedBlockingQueue add total cost=6567496ns , each cost=656ns
ArrayBlockingQueue add total cost=11259355ns , each cost=1125ns
PriorityBlockingQueue add total cost=8507130ns , each cost=850ns
DelayQueue add total cost=7681897ns , each cost=768ns
==========================
LinkedBlockingQueue add total cost=6663490ns , each cost=666ns
ArrayBlockingQueue add total cost=4468268ns , each cost=446ns
PriorityBlockingQueue add total cost=8451825ns , each cost=845ns
DelayQueue add total cost=13924673ns , each cost=1392ns
网友评论