美文网首页
Java 中ArrayBlockingQueue 用的是循环数组

Java 中ArrayBlockingQueue 用的是循环数组

作者: 多多的大白 | 来源:发表于2022-01-30 23:48 被阅读0次

    我们都知道Queue是一种操作线性表,可以用数组、链表来实现。
    Queue的特点是先进先出。

    存在问题:
    如果我们使用普通的数组来实现Queue,一定会存在一个问题,每次出对的时候,从数组的第一位开始取,那么只要出对完成,就会涉及到整个数组搬迁,会导致时间最差复杂度成为O(n) 。单纯从设计Queue来讲,我觉得是应该要避免这样的搬迁操作。

    循环数组

    我们都知道数组是一块连续的内存空间。但是循环数组,确实不多见。如何理解?

    环形数组

    如上图,是一个大神的画的,看着挺复杂的,实现起来并没有那么复杂,只要抓住核心tail和head 即可,还有一个很重要的一个算法 (tail + 1) % n ,如下代码。

    
    public class CircularQueue {
      // 数组:items,数组大小:n
      private String[] items;
      private int n = 0;
      // head表示队头下标,tail表示队尾下标
      private int head = 0;
      private int tail = 0;
    
      // 申请一个大小为capacity的数组
      public CircularQueue(int capacity) {
        items = new String[capacity];
        n = capacity;
      }
    
      // 入队
      public boolean enqueue(String item) {
        // 队列满了
        if ((tail + 1) % n == head) return false;
        items[tail] = item;
        tail = (tail + 1) % n;
        return true;
      }
    
      // 出队
      public String dequeue() {
        // 如果head == tail 表示队列为空
        if (head == tail) return null;
        String ret = items[head];
        head = (head + 1) % n;
        return ret;
      }
    }
    

    这是一段使用循环数组实现简单Queue的代码,这样实现完全解决了上面的问题,出对的时候时间复杂度从O(n) 降成O(1)。

    如果仔细看代码你会发现,在执行到临界点的时候,即队列满了,会存在有一个位置为空,内存如此宝贵怎么能忍。
    有个简单操作,show you code。

    
    public class CircularQueue {
    
      // 数组:items,数组大小:n
      private String[] items;
      private int n = 0;
      // 定义数组的长度
      private int size = 0;
      // head表示队头下标,tail表示队尾下标
      private int head = 0;
      private int tail = 0;
    
      // 申请一个大小为capacity的数组
      public CircularQueue(int capacity) {
        items = new String[capacity];
        n = capacity;
      }
    
      // 入队
      public boolean enqueue(String item) {
    
        if (size == n) {
          return false;
        }
        // 队列满了
        items[tail] = item;
        tail = (tail + 1) % n;
        size++;
        return true;
      }
    
      // 出队
      public String dequeue() {
        // 如果head == tail 表示队列为空
        if (size == 0) {
          return null;
        }
        String ret = items[head];
        head = (head + 1) % n;
        size--;
        return ret;
      }
    }
    
    

    上述简单介绍了循环队列,设计确认比较巧妙,对于一些底层的编码,什么都要节省,如果有方式方法能节省就节省吧,毕竟现在我们都是天天把节省成本挂在嘴边。

    重点看下ArrayBlockingQueue

    之前一直用ArrayBlockingQueue,也看过几次源码,看了也忘记了。一直没有抓住核心点。这次看到了循环数组突然对ArrayBlockingQueue有些好奇。

    是不是也是循环数组的方式来实现Queue
    我们看下ArrayBlockingQueue源码,其他方法可以先不着急去看,先去看核心的方法enqueue和dequeue

     /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
     private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    

    从上述代码来看,涉及到几个成员变量和ArrayBlockingQueue构造函数

        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
      /** Condition for waiting puts */
        private final Condition notFull;
    
      /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and the specified access policy.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked
         *        on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified.
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
    

    其中takeIndex、putIndex、count 都是int 类型,默认为0。

    notEmpty 表示队列为空的时候,出对操作的线程状态变成等待状态,并释放锁,入队操作的时候,会通知唤醒等待时间最长的线程。

    notFull 表示队列为满的时候,入队操作线程状态变成等待状态,并释放锁,出对操作的时候,会通知唤醒等待时间最长的线程。

    从ArrayBlockingQueue 中enqueue 和dequeue 源码看很容易理解使用的就是循环数组来实现的,如下代码

    if (++putIndex == items.length)
        putIndex = 0;
    
    
    if (++takeIndex == items.length)
      takeIndex = 0;
    

    光看enqueue 和dequeue 源码发现其实整个逻辑非常不严谨,不严谨也没关系,主要看他们俩都是私有方法。更何况有个核心成员变量 count还没有涉及到。

    ArrayBlockingQueue 对外提供的出对和入对操作都是成对,我们一一看来

     /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and {@code false} if this queue
         * is full.  This method is generally preferable to method {@link #add},
         * which can fail to insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    offer和poll方法中 通过count 来 判断队列是否满了,或者是否为空队列,前提都加了Lock,为了保证线程安全,出入对都统一锁了。

    到这里可能会有疑问阻塞去哪了?
    我们再看另外一对的出入对方法

     /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    

    notFull.await() 和 notEmpty.await() 使用上了,表示这俩个操作是阻塞的,很容易理解。
    这里面有一个细节点需要注意

     while (count == items.length)
              notFull.await();
     while (count == 0)
             notEmpty.await();
    

    我刚开始看的时候,我也有疑问为什么使用while而非if判断,其实是做了双重保证,防止过早或者意外通知唤醒。

    再看下最后一对

     /**
         * Inserts the specified element at the tail of this queue, waiting
         * up to the specified wait time for space to become available if
         * the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    理解了上面俩对出入对的方法,看到这对也是比较容易理解,加了阻塞时间而已。

    总结

    • 在我们经常使用基于数组的队列,里面也是使用循环数组的思想
    • 循环数组的核心,就是一个圆环,通过tail 和head指针来定义出入对的索引。
    • 循环数组的实现,有很多种,只要理解了思想,怎么实现就都可以。
    • ArrayBlockingQueue 是一种阻塞队列,也提供非阻塞的线程安全出入对方法。
    • ArrayBlockingQueue 阻塞实现是通过lock Condition来实现的。

    今天是辛丑年最后一天,除夕,回不了自己的家乡,只能待在杭城。
    在此祝所有的亲朋好友除夕安康,来年虎虎生威!

    相关文章

      网友评论

          本文标题:Java 中ArrayBlockingQueue 用的是循环数组

          本文链接:https://www.haomeiwen.com/subject/oicckrtx.html