美文网首页JUC并发包
JUC下的阻塞队列-ArrayBlockingQueue

JUC下的阻塞队列-ArrayBlockingQueue

作者: 于情于你 | 来源:发表于2021-04-12 22:34 被阅读0次

        ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。下面是主要结构。
        大概思想就是使用items数组存放元素,使用ReentrantLock和Condition实现线程安全。
        每次入队的时候都会通过putIndex找到新元素应该存放的下标,如果putIndex到头了,那么再从0开始,循环使用。入队完成后会通知取元素的线程拿元素。
        每次出队的时候都会通过takeIndex找到这次要取的元素的下标,如果takeIndex到头了,那么也从0开始,循环使用。出队完成后会通知存放元素的线程继续存元素。

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
    
        // 队列的数组
        final Object[] items;
    
       // 调用take, poll, peek or remove方法,元素的位置
        int takeIndex;
    
       // 调用 put, offer, or add方法,元素的位置
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        // 等待拿元素线程的等待条件队列
        private final Condition notEmpty;
    
         // 等待存元素线程的等待条件队列
        private final Condition notFull;
    
        /**
         * Shared state for currently active iterators, or null if there
         * are known not to be any.  Allows queue operations to update
         * iterator state.
         */
        transient Itrs itrs = null;
    

    入队API:

    add,如果队列满了,则会抛出IllegalStateException
    public boolean add(E e) {
            return super.add(e);
        }
    
    
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
    
    public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 队列满了,返回false
                if (count == items.length)
                    return false;
                else {
                // 否则入队
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
      private void enqueue(E x) {
            // ArrayBlockingQueue存放元素的数组
            final Object[] items = this.items;
            // 把新元素放到该放的index
            items[putIndex] = x;
            // 如果加完这个元素队列满了,那么把putIndex置为0
            if (++putIndex == items.length)
                putIndex = 0;
            // 元素计数
            count++;
            // 把等待拿元素的线程,从AQS的等待条件队列拿到同步队列,也就是告诉消费线程,可以消费了
            notEmpty.signal();
        }
    
    offer,如果队列满了,则返回false,入队成功返回true,不会抛IllegalStateException

    源码同上

    put,将指定的元素插入此队列的尾部,如果队列已满,则等待空间。
     public void put(E e) throws InterruptedException {
            checkNotNull(e);
            // 加一个可中断的锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
              // 如果队列满了,则等待,加入notFull等待条件队列。
                while (count == items.length)
                    notFull.await();
              // 直到有空间了,入队
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    出队API:

    poll,如果队列为空,直接返回null,不等待。并且删除元素
    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {  
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
     private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
          // 在可以拿元素的位置takeIndex,获取元素
            E x = (E) items[takeIndex];
            // 把takeIndex位置的元素置为null
            items[takeIndex] = null;
            // takeIndex到头了,那么再从0开始
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
    
            // 通知存放元素的线程,可以存放元素了
            notFull.signal();
            return x;
        }
    
    take,如果队列为空,则加入到条件等待队列。并且删除元素
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    peek,如果队列为空,返回null。不删除元素
    public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty
            } finally {
                lock.unlock();
            }
        }
    

    相关文章

      网友评论

        本文标题:JUC下的阻塞队列-ArrayBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/pldflltx.html