美文网首页
Java集合之Queue

Java集合之Queue

作者: felixfeijs | 来源:发表于2020-07-06 17:10 被阅读0次

    Java集合之Queue

    Queue关系图如下

    java-集合之Queue关系图.png
    • 从如上图可看出,LinkedList具有List的特性

    Queue实现类之LinkedList

    1. 从如下代码可看出以下特点
      • 遵循先进先出原则
      • 可指定索引进行插入
      public static void main(String[] args) {
          LinkedList<String> linkedList = new LinkedList<>();
          linkedList.add("felix");
          linkedList.add("fei");
          linkedList.add(null);
          linkedList.add(1,"peng"); //指定对应的索引进行插入
          linkedList.addFirst("10"); //将元素添加到第一个位置 失败抛出异常
          linkedList.addLast("10"); //将元素添加到最后一个位置 失败抛出异常
          linkedList.removeFirst(); //删除第一个元素
          //linkedList.remove(index); // 删除指定索引的元素
          linkedList.offerFirst("10"); //将元素添加到第一个位置 返回Boolean值
          linkedList.set(2,"10"); //将第三个元素设置为10,效率低
          linkedList.get(2); //获取第三个元素,效率低
          for (Iterator<String> it = linkedList.iterator();it.hasNext();){
              System.out.println(it.next());
          }  
      
    2. LinkedList其他特性
      • 线程不安全
      • 实现了Cloneable接口,可进行clone()
      • 实现了java.io.Serializable接口,支持序列化
      • 数据结构为双向链表
      • 插入和删除速度快
    3. 源码分析-1
        private static class Node<E> {
            E item;
            Node<E> next;
            Node<E> prev;
    
            Node(Node<E> prev, E element, Node<E> next) {
                this.item = element;
                this.next = next;
                this.prev = prev;
            }
        }
    
    • 从如上源码可看出,LinkedList包含了一个内部类Node,它包含了当前节点的值,上一个节点,下一个节点.

    Queue实现类之ConcurrentLinkedQueue

    1. 简单介绍
      • 链表式队列
      • ConcurrentLinkedQueue是支持高性能的Queue
    2. 常用方法
        public static void main(String[] args) {
            ConcurrentLinkedQueue conLinkQueue = new ConcurrentLinkedQueue();
            boolean resAdd = conLinkQueue.add("felix"); //将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。
            boolean resOffer = conLinkQueue.offer("fei"); //将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
            Object poll = conLinkQueue.poll();//获取并移除此队列的头,如果此队列为空,则返回 null
            Object peek = conLinkQueue.peek();//获取但不移除此队列的头;如果此队列为空,则返回 null。
        }
    
    1. 特性说明
      • 通过无锁方式实现高并发状态下的高性能
      • 遵循FIFO原则
      • 不允许NULL

    Queue之BlockingQueue接口

    BlockingQueue之ArrayBlockingQueue
    1. 简单介绍

      • 数组实现的一个阻塞队列
    2. 常用方法

      1. 常用方法1
          public static void main(String[] args) throws InterruptedException {
          ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4,true);
          arrayBlockingQueue.add("fei"); //向队列放入元素
          arrayBlockingQueue.add("peng");
          System.out.println("add之后的队列="+arrayBlockingQueue);
          Object peek = arrayBlockingQueue.peek(); // 获取出队列第一个元素,返回结果为获取的元素
          System.out.println("peek取出的元素="+peek.toString());
          System.out.println("peek之后的队列="+arrayBlockingQueue);
          arrayBlockingQueue.put("felix"); //向队列放入元素
          System.out.println("put之后的队列="+arrayBlockingQueue);
          Object take = arrayBlockingQueue.take();// 取出队列第一个元素,返回结果为取出的元素
          System.out.println("take取出的元素="+take.toString());
          System.out.println("take之后的队列="+arrayBlockingQueue);
          arrayBlockingQueue.put("1"); // 第三个元素 插入成功
          arrayBlockingQueue.put("2"); // 第四个元素 插入成功
          //arrayBlockingQueue.take();
          arrayBlockingQueue.put("3"); // 第五个元素,插入失败,因为数组定长为4,插入第五个必须取出一个元素
          System.out.println("put之后的队列="+arrayBlockingQueue);
      }
      
      • 控制台结果
      add之后的队列=[fei, peng]
      peek取出的元素=fei
      peek之后的队列=[fei, peng]
      put之后的队列=[fei, peng, felix]
      take取出的元素=fei
      take之后的队列=[peng, felix]
      
      1. 常用方法二
          public static void main(String[] args) throws InterruptedException {
          ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4,true);
          arrayBlockingQueue.offer("felix",10, TimeUnit.DAYS); // 向队列放入10天后过期清理的元素
          arrayBlockingQueue.offer("fei",10, TimeUnit.DAYS);
          System.out.println("offer后的队列="+arrayBlockingQueue);
          arrayBlockingQueue.poll(10,TimeUnit.DAYS); // 取出队列的第一个元素
          System.out.println("poll后的队列="+arrayBlockingQueue);
      }
      
      • 控制台结果
      offer后的队列=[felix, fei]
      poll后的队列=[fei]
      
      1. 常用添加方法比较
      方法名 队列满时处理方式 方法返回值
      add(E e) 抛出"Queue full"异常 boolean
      offer(E e) 返回flase boolean
      put(E e) 线程阻塞,直到中断或被唤醒 void
      offer(E e,long timeout,TimeUnit unit) 在规定时间内重试,超过规定时间返回false boolean
      1. 常用取出方法比较
      方法名 队列空时处理方式 方法返回值
      peek() 返回null E
      poll() 返回null E
      take() 线程阻塞,直到中断或被唤醒 E
      poll(E e,long timeout,TimeUnit unit) 在规定时间内重试,超过规定时间返回null E
    3. 特性说明

      • 数组格式
      • 阻塞队列(可重入锁)
    4. 源码分析

      1. 构造方法一
          public ArrayBlockingQueue(int capacity) {
          this(capacity, false);
      }
      
      • 从以上源码可看出该队列是定长的
      1. 构造方法二
          public ArrayBlockingQueue(int capacity, boolean fair) {
          if (capacity <= 0)
              throw new IllegalArgumentException();
          this.items = new Object[capacity];
          lock = new ReentrantLock(fair);
          notEmpty = lock.newCondition();
          notFull =  lock.newCondition();
      }
      
      • 从以上源码可看出,第二个参数为是否公平,它代表的是可重入锁的公平和非公平
      • 公平:按照现场申请的先后顺序获得锁,会带来系统额外的开销,相当于每一个线程都有一个排队时间,性能降低
      • 非公平:不管顺序,直接拿
      1. 构造方法三
          public ArrayBlockingQueue(int capacity, boolean fair,
                                Collection<? extends E> c) {
          this(capacity, fair);
      
          final ReentrantLock lock = this.lock;
          lock.lock(); // Lock only for visibility, not mutual exclusion
          try {
              int i = 0;
              try {
                  for (E e : c) {
                      checkNotNull(e);
                      items[i++] = e;
                  }
              } catch (ArrayIndexOutOfBoundsException ex) {
                  throw new IllegalArgumentException();
              }
              count = i;
              putIndex = (i == capacity) ? 0 : i;
          } finally {
              lock.unlock();
          }
      }
      
      • 第三个参数定义了集合的类型
    BlockingQueue之LinkedBlockingQueue
    1. 简单介绍
      • 基于链表的并发队列.
    2. 常用方法
          public static void main(String[] args) throws InterruptedException {
          LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(4);
          boolean offerFlag = linkedBlockingQueue.offer("felix"); //元素入队方法,返回boolean值
          System.out.println("offer之后的队列="+linkedBlockingQueue); // 元素入队方法,返回空值,抛出InterruptedException异常
          linkedBlockingQueue.put("felix"); //
          System.out.println("put之后的队列="+linkedBlockingQueue);
          linkedBlockingQueue.offer("fei");
          linkedBlockingQueue.offer("peng");
          linkedBlockingQueue.offer("felixOne"); // 队列初始化4个元素,第五次元素入队时队列已满,阻塞等待
          System.out.println("offer之后的队列="+linkedBlockingQueue);
          Object poll = linkedBlockingQueue.poll(); // 拿出并返回第一个元素
          System.out.println("poll出来的元素="+poll.toString());
          System.out.println("poll之后的队列="+linkedBlockingQueue);
          boolean removeFlag = linkedBlockingQueue.remove("fei"); // 移除指定元素
          System.out.println("remove之后的队列="+linkedBlockingQueue);
          linkedBlockingQueue.clear(); // 清空队列
          System.out.println("清空后的队列="+linkedBlockingQueue);
      }
      
      • 控制台结果
      offer之后的队列=[felix]
      

    put之后的队列=[felix, felix]
    offer之后的队列=[felix, felix, fei, peng]
    poll出来的元素=felix
    poll之后的队列=[felix, fei, peng]
    remove之后的队列=[felix, peng]
    清空后的队列=[]
    ```

    1. 特性说明
    • 无界队列,需要指定队列大小
    • 链表结构
    • 线程安全
    1. 源码分析
      1. 从下面构造方法可看出该队列的默认无界
      public LinkedBlockingQueue() {
      // 默认大小为Integer.MAX_VALUE
      this(Integer.MAX_VALUE);
      }
      
      • 缺点:如果不指定队列的容量大小,如果存在添加速度大于删除速度,会出现内存溢出的情况
      1. 从下面put和take方法中看出内部使用了takeLock和putLock对并发进行控制,添加和操作并不是互斥操作,可以同时进行,这样可以提高吞吐量.
          /**
      * 节点类,用于存储数据
      */
      static class Node<E> {
          E item;
          Node<E> next;
      
          Node(E x) { item = x; }
      }
      
      /** 阻塞队列的大小,默认为Integer.MAX_VALUE */
      private final int capacity;
      
      /** 当前阻塞队列中的元素个数 */
      private final AtomicInteger count = new AtomicInteger();
      
      /**
      * 阻塞队列的头结点
      */
      transient Node<E> head;
      
      /**
      * 阻塞队列的尾节点
      */
      private transient Node<E> last;
      
      /** 获取并移除元素时使用的锁,如take, poll, etc */
      private final ReentrantLock takeLock = new ReentrantLock();
      
      /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
      private final Condition notEmpty = takeLock.newCondition();
      
      /** 添加元素时使用的锁如 put, offer, etc */
      private final ReentrantLock putLock = new ReentrantLock();
      
      /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
      private final Condition notFull = putLock.newCondition();
      
      1. 入队方法之put(E e)
      public void put(E e) throws InterruptedException {
      if (e == null) throw new NullPointerException();
      int c = -1;
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      final AtomicInteger count = this.count;
      // 获取锁中断
      putLock.lockInterruptibly();
      try {
          //判断队列是否已满,如果已满阻塞等待
          while (count.get() == capacity) {
              notFull.await();
          }
          // 把node放入队列中
          enqueue(node);
          c = count.getAndIncrement();
          // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
          if (c + 1 < capacity)
              notFull.signal();
      } finally {
          putLock.unlock();
      }
      // 如果队列中有一条数据,唤醒消费线程进行消费
      if (c == 0)
          signalNotEmpty();
      }
      
      1. 入队方法之offer(E e)
      public boolean offer(E e) {
      if (e == null) throw new NullPointerException();
      final AtomicInteger count = this.count;
      if (count.get() == capacity)
          return false;
      int c = -1;
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
          // 如果有,唤醒下一个添加线程进行添加操作。
          if (count.get() < capacity) {
              enqueue(node);
              c = count.getAndIncrement();
              if (c + 1 < capacity)
                  notFull.signal();
          }
      } finally {
          putLock.unlock();
      }
      if (c == 0)
          signalNotEmpty();
      return c >= 0;
      }
      
      public boolean offer(E e, long timeout, TimeUnit unit)
          throws InterruptedException {
      
      if (e == null) throw new NullPointerException();
      long nanos = unit.toNanos(timeout);
      int c = -1;
      final ReentrantLock putLock = this.putLock;
      final AtomicInteger count = this.count;
      putLock.lockInterruptibly();
      try {
          // 等待超时时间nanos,超时时间到了返回false
          while (count.get() == capacity) {
              if (nanos <= 0)
                  return false;
              nanos = notFull.awaitNanos(nanos);
          }
          enqueue(new Node<E>(e));
          c = count.getAndIncrement();
          if (c + 1 < capacity)
              notFull.signal();
      } finally {
          putLock.unlock();
      }
      if (c == 0)
          signalNotEmpty();
      return true;
      }
      
      1. 入队方法之offer(E e,long timeout,TimeUnit unit)
      public boolean offer(E e, long timeout, TimeUnit unit)
          throws InterruptedException {
      
      if (e == null) throw new NullPointerException();
      long nanos = unit.toNanos(timeout);
      int c = -1;
      final ReentrantLock putLock = this.putLock;
      final AtomicInteger count = this.count;
      putLock.lockInterruptibly();
      try {
          // 等待超时时间nanos,超时时间到了返回false
          while (count.get() == capacity) {
              if (nanos <= 0)
                  return false;
              nanos = notFull.awaitNanos(nanos);
          }
          enqueue(new Node<E>(e));
          c = count.getAndIncrement();
          if (c + 1 < capacity)
              notFull.signal();
      } finally {
          putLock.unlock();
      }
      if (c == 0)
          signalNotEmpty();
      return true;
      }
      
      1. 出队方法之E take();
      public E take() throws InterruptedException {
      E x;
      int c = -1;
      final AtomicInteger count = this.count;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lockInterruptibly();
      try {
          // 队列为空,阻塞等待
          while (count.get() == 0) {
              notEmpty.await();
          }
          x = dequeue();
          c = count.getAndDecrement();
          // 队列中还有元素,唤醒下一个消费线程进行消费
          if (c > 1)
              notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
      // 移除元素之前队列是满的,唤醒生产线程进行添加元素
      if (c == capacity)
          signalNotFull();
      return x;
      }
      
      1. 出队方法之E poll();
      public E poll() {
      final AtomicInteger count = this.count;
      if (count.get() == 0)
          return null;
      E x = null;
      int c = -1;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
      try {
          if (count.get() > 0) {
              x = dequeue();
              c = count.getAndDecrement();
              if (c > 1)
                  notEmpty.signal();
          }
      } finally {
          takeLock.unlock();
      }
      if (c == capacity)
          signalNotFull();
      return x;
      }
      
      1. 获取元素方法
      public E peek() {
      if (count.get() == 0)
          return null;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
      try {
          Node<E> first = head.next;
          if (first == null)
              return null;
          else
              return first.item;
      } finally {
          takeLock.unlock();
      }
      }
      
      1. 删除元素方法
      public boolean remove(Object o) {
      if (o == null) return false;
      // 两个lock全部上锁
      fullyLock();
      try {
          // 从head开始遍历元素,直到最后一个元素
          for (Node<E> trail = head, p = trail.next;
               p != null;
               trail = p, p = p.next) {
              // 如果找到相等的元素,调用unlink方法删除元素
              if (o.equals(p.item)) {
                  unlink(p, trail);
                  return true;
              }
          }
          return false;
      } finally {
          // 两个lock全部解锁
          fullyUnlock();
      }
      }
      
      void fullyLock() {
      putLock.lock();
      takeLock.lock();
      }
      
      void fullyUnlock() {
      takeLock.unlock();
      putLock.unlock();
      }
      
    BlockingQueue之PriorityBlockingQueue
    1. 简单介绍
    • 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列,也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
    1. 常用方法
        public static void main(String[] args) throws InterruptedException {
            PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(4);
            boolean offerFlag = priorityBlockingQueue.offer("4"); //元素入队方法,返回boolean值
            System.out.println("offer之后的队列="+priorityBlockingQueue); // 元素入队方法,返回空值,抛出InterruptedException异常
            priorityBlockingQueue.put("5"); //
            System.out.println("put之后的队列="+priorityBlockingQueue);
            priorityBlockingQueue.offer("2");
            priorityBlockingQueue.offer("11");
            priorityBlockingQueue.offer("3"); // 队列初始化4个元素,可自行扩容
            System.out.println("offer之后的队列="+priorityBlockingQueue);
            Object poll = priorityBlockingQueue.poll(); // 拿出并返回第一个元素
            System.out.println("poll出来的元素="+poll.toString());
            System.out.println("poll之后的队列="+priorityBlockingQueue);
            boolean removeFlag = priorityBlockingQueue.remove("11"); // 移除指定元素
            System.out.println("remove之后的队列="+priorityBlockingQueue);
            priorityBlockingQueue.clear(); // 清空队列
            System.out.println("清空后的队列="+priorityBlockingQueue);
        }
    
    • 控制台结果
    offer之后的队列=[4]
    put之后的队列=[4, 5]
    offer之后的队列=[11, 2, 4, 5, 3]
    poll出来的元素=11
    poll之后的队列=[2, 3, 4, 5]
    remove之后的队列=[2, 3, 4, 5]
    清空后的队列=[]
    
    1. 特性说明
    • 无界阻塞队列
    • 自行扩容
    • 默认自然序升序排列
    1. 源码分析
      1. 从之下源码可看出初始化容量为11
          /**
       * Default array capacity.
       */
      private static final int DEFAULT_INITIAL_CAPACITY = 11;
      
      1. 从以下源码可看出如果入参元素为null,则抛出异常,加锁,如果元素的数量等于数组的容量时,就要扩容;如果数组还有空间时,就会入队,入队会判断是否初始化比较器,如果有比较器,则按照比较器的就行排序,没有的话,则元素必须具有可比较性
      public boolean offer(E e) {
         if (e == null)
             throw new NullPointerException();
         final ReentrantLock lock = this.lock;
         lock.lock();
         int n, cap;
         Object[] array;
         while ((n = size) >= (cap = (array = queue).length))
             tryGrow(array, cap);
         try {
             Comparator<? super E> cmp = comparator;
             if (cmp == null)
                 siftUpComparable(n, e, array);
             else
                 siftUpUsingComparator(n, e, array, cmp);
             size = n + 1;
             notEmpty.signal();
         } finally {
             lock.unlock();
         }
         return true;
      }
      
      1. 通过源码看扩容
      private void tryGrow(Object[] array, int oldCap) {
          lock.unlock(); // must release and then re-acquire main lock
          Object[] newArray = null;
          if (allocationSpinLock == 0 &&
              UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                       0, 1)) {
              try {
                  int newCap = oldCap + ((oldCap < 64) ?
                                         (oldCap + 2) : // grow faster if small
                                         (oldCap >> 1));
                  if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                      int minCap = oldCap + 1;
                      if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                          throw new OutOfMemoryError();
                      newCap = MAX_ARRAY_SIZE;
                  }
                  if (newCap > oldCap && queue == array)
                      newArray = new Object[newCap];
              } finally {
                  allocationSpinLock = 0;
              }
          }
          if (newArray == null) // back off if another thread is allocating
              Thread.yield();
          lock.lock();
          if (newArray != null && queue == array) {
              queue = newArray;
              System.arraycopy(array, 0, newArray, 0, oldCap);
          }
      }
      
      • 从扩容源码可看出如果容量在64以下,每次扩容增加2个,如果大于等于64,则每次扩容扩大50%。如果超过最大容量,oldCap距离最大容量很近1,没法扩容,就会抛出 OutOfMemoryError异常,如果还没达到距离1,新的数组容量就时最大数组容量。
      1. 通过源码看出队
      public E poll() {
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             return dequeue();
         } finally {
             lock.unlock();
         }
      }
      private E dequeue() {
         int n = size - 1;
         if (n < 0)
             return null;
         else {
             Object[] array = queue;
             E result = (E) array[0];
             E x = (E) array[n];
             array[n] = null;
             Comparator<? super E> cmp = comparator;
             if (cmp == null)
                 siftDownComparable(0, x, array, n);
             else
                 siftDownUsingComparator(0, x, array, n, cmp);
             size = n;
             return result;
         }
      }
      
      • 从出队源码可看出,如果队列为空则返回null,如果不为空则返回第一个元素,然后重新排序。
      1. 从源码看remove方法
      public boolean remove(Object o) {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              int i = indexOf(o);
              if (i == -1)
                  return false;
              removeAt(i);
              return true;
          } finally {
              lock.unlock();
          }
      }
      
      • 加锁
      • 根据对象找到在数组中的索引
      • 根据索引删除元素

    各实现类对比图

    java-LinkedList、ConcurrentLinkedQueue、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue.jpg

    相关文章

      网友评论

          本文标题:Java集合之Queue

          本文链接:https://www.haomeiwen.com/subject/bakgqktx.html