美文网首页java
JAVA队列应用与对比

JAVA队列应用与对比

作者: Bill_Li_GB | 来源:发表于2019-08-01 19:27 被阅读0次

Queue:基本上,队列就是一个先进先出(FIFO)的数据结构

Queue接口与List、Set同一级别,都是集成了Collection接口。

Queue分为:阻塞队列实现和非阻塞队列实现,线程安全和非线程安全之分。

一、非阻塞队列(Queue)

不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue

  • PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。

  • ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

image.png

当往队列中追加10个对象,耗时对比

PriorityQueue total cost=26072ns , each cost=2607ns

ConcurrentLinkedQueue total cost=39504ns , each cost=3950ns

当往队列里面追加10000个数据时,耗时对比

PriorityQueue total cost=1185094ns , each cost=118ns

ConcurrentLinkedQueue total cost=1157837ns , each cost=115ns

对比发现队列在一次性追加越多数量的数据似乎速度越快

所有队列的性能对比(代码最下面)

PriorityQueue add total cost=2263924ns , each cost=226ns
ConcurrentLinkedQueue add total cost=1170872ns , each cost=117ns
LinkedBlockingQueue add total cost=3611376ns , each cost=361ns
ArrayBlockingQueue add total cost=1749199ns , each cost=174ns
PriorityBlockingQueue add total cost=2289602ns , each cost=228ns
DelayQueue add total cost=2733222ns , each cost=273ns
PriorityQueue remove total cost=1747223ns , each cost=174ns
ConcurrentLinkedQueue remove total cost=1502304ns , each cost=150ns
LinkedBlockingQueue remove total cost=1677698ns , each cost=167ns
ArrayBlockingQueue remove total cost=1133345ns , each cost=113ns
PriorityBlockingQueue remove total cost=1420928ns , each cost=142ns
DelayQueue remove total cost=1947109ns , each cost=194ns

二、阻塞队列(BlockingQueue)

不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。

五个队列所提供的各有不同:

  • ArrayBlockingQueue :一个由数组支持的有界队列。
  • LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
  • PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。
  • DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
  • SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

  add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
  remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
**  element **返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
  offer 添加一个元素并返回true 如果队列已满,则返回false
  poll 移除并返问队列头部的元素 如果队列为空,则返回null
  peek 返回队列头部的元素 如果队列为空,则返回null
  put 添加一个元素 如果队列满,则阻塞
  take 移除并返回队列头部的元素 如果队列为空,则阻塞

** remove、element、offer 、poll、peek 其实是属于Queue接口 **

** 阻塞队列的操作可以根据它们的响应方式分为以下三类:aad、removee和element操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。当然,在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer、poll、peek方法。这些方法在无法完成任务时 只是给出一个出错示而不会抛出异常。**

** 注意:poll和peek方法出错进返回null。因此,向队列中插入null值是不合法的 **

** 最后,我们有阻塞操作put和take。put方法在队列满时阻塞,take方法在队列空时阻塞 **

阻塞队列说明

LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。

ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。

PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。

DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。

image.png

---代码:开始 简单的数据操作对比---

/**
 * @Description: 非阻塞队列对比,1万次追加数据,1万次移除数据 1万次生产消费同时处理效率对比
 * @Author: [Bill] on [2019/8/1 15:08]
 */
public class QueueInterTest {
    public static void main(String[] args) {
        //执行1万次追加数据
        int count = 10000;
        
        Queue priorityQueue = new PriorityQueue();//有序列表
        Queue concurrentLinkedQueue = new ConcurrentLinkedQueue();//基于链接节点,线程安全
        
        Queue linkedBlockingQueue = new LinkedBlockingQueue();
        Queue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要选择是否需要公平性,所以需要加入的对象实现Comparable比较接口
        Queue priorityBlockingQueue = new PriorityBlockingQueue();
        Queue delayQueue = new DelayQueue();//只有在延迟期满时才能从中提取元素,由于需要实现延迟需要加入的对象实现Delayed接口

        List<Queue> queues = Arrays.asList(priorityQueue, concurrentLinkedQueue, linkedBlockingQueue,arrayBlockingQueue, priorityBlockingQueue, delayQueue);

        queues.forEach(p -> testAddTemplate(p));

        //溢出1万次获取数据
        queues.forEach(p -> testRemoveTemplate(p));

    }

    private static void testAddTemplate(Queue priorityQueue) {
        //进行GC回收
        JvmUtils.restoreJVM();
        //进行测试
        int count = 10000;
        //创建对象
        TestBean testBean = new TestBean();
        long start = System.nanoTime();
        for (int i = 0; i < count; i++) {
            priorityQueue.add(testBean);
        }
        long nscost = (System.nanoTime()) - start;
        System.out.println(priorityQueue.getClass().getSimpleName() + " add total cost=" + nscost + "ns , each cost="
                + (nscost / count) + "ns");
        JvmUtils.restoreJVM();
    }

    private static void testRemoveTemplate(Queue priorityQueue) {
        //进行GC回收
        JvmUtils.restoreJVM();
        //进行测试
        long start = System.nanoTime();
        int size = priorityQueue.size();
        for (int i = 0; i < size; i++) {
            priorityQueue.remove();
        }
        long nscost = (System.nanoTime()) - start;
        System.out.println(priorityQueue.getClass().getSimpleName() + " remove total cost=" + nscost + "ns , each cost="
                + (nscost / size) + "ns");
        JvmUtils.restoreJVM();
    }

---代码:结束 简单的数据操作对比---

阻塞队列使用对比 1万次生产消费同时处理效率对比。

对比代码

package com.evolution.upgrade.collection.queue;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import com.evolution.upgrade.Serialization.TestBean;
import com.evolution.upgrade.collection.queue.impl.BlockingQueueInterImpl;
import com.evolution.upgrade.utils.JvmUtils;

/**
 * @Description: 阻塞队列使用对比 1万次生产消费同时处理效率对比。
 * @Author: [Bill] on [2019/8/1 17:30]
 */
public class BlockingQueueInterTest {

    private static final int count = 10000;

    public static void main(String[] args) {
        BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(count);//需要选择是否需要公平性,所以需要加入的对象实现Comparable比较接口
        BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
        BlockingQueue delayQueue = new DelayQueue();//只有在延迟期满时才能从中提取元素,由于需要实现延迟需要加入的对象实现Delayed接口

        List<BlockingQueue> queues = Arrays.asList(linkedBlockingQueue, linkedBlockingQueue, arrayBlockingQueue,
                priorityBlockingQueue, delayQueue);

        queues.forEach(p -> testTemplate(p));

        System.out.println("==========================");

        queues.forEach(p -> testThreadTemplate(p));

    }

    private static void testThreadTemplate(BlockingQueue blockingQueue) {
        BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
        //预处理
        for (int i = 0; i < 10; i++) {
            try {
                blockingQueueInter.produce(new TestBean());
                blockingQueueInter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //进行GC回收
        JvmUtils.restoreJVM();
        //多线程,生产消费同时进行
        Producer producer = new Producer(blockingQueueInter, new TestBean());
        CountDownLatch countDownLatch = new CountDownLatch(count - 1);
        Consumer consumer = new Consumer(blockingQueueInter, countDownLatch);
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(producer);
        service.submit(consumer);

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        consumer.keepRunning = false;

        service.shutdown();
        //进行GC回收
        JvmUtils.restoreJVM();
    }

    private static void testTemplate(BlockingQueue blockingQueue) {
        try {
            //进行GC回收
            JvmUtils.restoreJVM();
            BlockingQueueInter blockingQueueInter = new BlockingQueueInterImpl(blockingQueue);
            //创建对象
            TestBean testBean = new TestBean();
            long start = System.nanoTime();
            for (int i = 0; i < count; i++) {
                blockingQueueInter.produce(testBean);
            }
            for (int i = 0; i < count; i++) {
                blockingQueueInter.consume();
            }
            long nscost = (System.nanoTime()) - start;
            System.out.println(blockingQueue.getClass().getSimpleName() + " add total cost=" + nscost
                    + "ns , each cost=" + (nscost / count) + "ns");
            JvmUtils.restoreJVM();
        } catch (Exception ex) {

        }
    }

    static class Producer implements Runnable {

        private final BlockingQueueInter blockingQueueInter;
        private final TestBean testBean;

        public Producer(BlockingQueueInter blockingQueueInter, TestBean testBean) {
            this.blockingQueueInter = blockingQueueInter;
            this.testBean = testBean;
        }

        @Override
        public void run() {
            for (int i = 0; i < count; i++) {
                try {
                    blockingQueueInter.produce(testBean);
                } catch (InterruptedException e) {

                }
            }
        }
    }

    static class Consumer implements Runnable {

        private final BlockingQueueInter blockingQueueInter;
        private final CountDownLatch countDownLatch;

        AtomicInteger cou = new AtomicInteger(0);

        volatile boolean keepRunning = true;

        long start = System.nanoTime();

        public Consumer(BlockingQueueInter blockingQueueInter, CountDownLatch countDownLatch) {
            this.blockingQueueInter = blockingQueueInter;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                while (keepRunning) {
                    if (cou.intValue() + 1 >= count) {
                        long nscost = (System.nanoTime()) - start;
                        System.out.println(blockingQueueInter.getName() + " add total cost=" + nscost
                                + "ns , each cost=" + (nscost / count) + "ns");
                        break;
                    }
                    Object consume = blockingQueueInter.consume();
                    if (Objects.nonNull(consume)) {
                        cou.getAndIncrement();
                        countDownLatch.countDown();
                    }
                }
            } catch (InterruptedException e) {

            } finally {
                countDownLatch.countDown();
            }
        }
    }

}

结果

LinkedBlockingQueue add total cost=6567496ns , each cost=656ns
ArrayBlockingQueue add total cost=11259355ns , each cost=1125ns
PriorityBlockingQueue add total cost=8507130ns , each cost=850ns
DelayQueue add total cost=7681897ns , each cost=768ns
==========================
LinkedBlockingQueue add total cost=6663490ns , each cost=666ns
ArrayBlockingQueue add total cost=4468268ns , each cost=446ns
PriorityBlockingQueue add total cost=8451825ns , each cost=845ns
DelayQueue add total cost=13924673ns , each cost=1392ns

相关文章

  • JAVA队列应用与对比

    Queue:基本上,队列就是一个先进先出(FIFO)的数据结构 Queue接口与List、Set同一级别,都是集成...

  • 队列

    队列特性 对比队列和栈 基于数组的队列 对比队列学习循环队列 循环队列难点 阻塞队列 并发队列 应用:线程池中拒绝...

  • Redis应用-布隆过滤器

    系列文章Redis应用-分布式锁Redis应用-异步消息队列与延时队列Redis应用-位图Redis应用-Hype...

  • Redis应用-Geo

    系列文章Redis应用-分布式锁Redis应用-异步消息队列与延时队列Redis应用-位图Redis应用-Hype...

  • JAVA并发梳理(四) 各种安全队列

    在 Java 多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线...

  • Java实现队列

    队列是一种先进先出的数据结构,和栈刚好相反,队列在算法中也应用广泛。本文,我们主要探讨Java实现队列。 队列 队...

  • 第二十一章、java线程安全队列

    在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。 Java提供的线程安全的Que...

  • java队列Queue

    在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queu...

  • Java线程安全队列

    在java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queu...

  • 并发队列ConcurrentLinkedQueue和阻塞队列Li

    在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安...

网友评论

    本文标题:JAVA队列应用与对比

    本文链接:https://www.haomeiwen.com/subject/wcdcdctx.html