美文网首页
Java1.8-ArrayBlockingQueue源码学习(五

Java1.8-ArrayBlockingQueue源码学习(五

作者: 骑着乌龟去看海 | 来源:发表于2019-02-28 10:58 被阅读31次

    一、概述

      ArrayBlockingQueue是一个有界的阻塞队列,底层通过数组来实现,会按照常规的先进先出(FIFO)的原则来操作队列,元素从队尾入队,从队头出队。同样,如果在获取队列元素的时候队列为空,则会阻塞;在往队列添加元素的时候,如果队列已满,则会阻塞;

    • 该队列不是寻常意义上的队列,而是一个循环队列;
    • 队列的容量固定,一旦创建,队列的容量就无法修改;
    • 该队列支持可选的访问策略,默认是非公平的访问策略;

    接下来我们来看一下该类的实现源码。

    二、源码

    1. 继承结构及构造方法
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    

    可以看到,继承结构都比较常规,继承自AbstractQueue,实现了BlockingQueue,并且实现了序列化接口。

    然后来看一下它的构造方法:

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    

    先来看一下它前两个构造方法。因为是有界队列,所以要制定队列的容量大小;并且由于是通过ReentrantLock来实现锁,而ReentrantLock有公平锁和非公平锁之分,因此要制定对应的访问策略,默认是非公平锁;对于公平锁而言,先阻塞的会先获取到锁;而对于非公平锁,则是进行抢占式获取。

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        // 构造方法
        this(capacity, fair);
    
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                // 遍历集合
                for (E e : c) {
                    // 校验集合中元素不能是null
                    checkNotNull(e);
                    // 往数组中添加元素
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                // 如果初始化容量小于传入集合容量,异常
                throw new IllegalArgumentException();
            }
            // 设置队列数量
            count = i;
            // 初始化入队的索引
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
    

    最后一个构造方法是通过一个给定的集合来创建队列,这里会把集合中的元素挨个添加到队列中,同时初始化的容量不能小于所给定集合的容量。

    2. 属性

    接下来,我们来看一下该类的一些属性:

    /** 保存元素的数组 */
    final Object[] items;
    
    /** 获取元素(出队)的索引 */
    int takeIndex;
    
    /** 添加元素(入队)的索引 */
    int putIndex;
    
    /** 队列中元素的数量 */
    int count;
    
    /** 可重入锁 */
    final ReentrantLock lock;
    
    /** 获取元素(出队)的Condition条件 */
    private final Condition notEmpty;
    
    /** 添加元素(入队)的Condition条件 */
    private final Condition notFull;
    
    /**
     * 队列的迭代器
     */
    transient Itrs itrs = null;
    

    可以看到,ArrayBlockingQueue定义了一个可重入锁,并且定义了两个Condition条件,分别用于出队和入队的时候进行阻塞。

    3. 方法
    3.1 add方法

    add方法表示入队,将数据插入到队尾,入队成功返回true;如果队列已满,抛出异常。最终还会间接通过offer方法来实现:

    public boolean add(E e) {
        return super.add(e);
    }
    
    // 继承类AbstractQueue
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    
    3.2 offer方法

    而对offer方法而言,如果队列已满,返回false,不会抛出异常:

    public boolean offer(E e) {
        // 非空校验
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 判断队列是否已满
            if (count == items.length)
                return false;
            else {
                // 没满,入队
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    

    可以看到,ArrayBlockingQueue不允许存储null元素,这里会调用enqueue方法进行入队操作:

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 把元素放入到 数组的入队索引处
        items[putIndex] = x;
        // 元素放进去后,如果入队索引等于队列长度,表示已满
        // 重置入队索引为0
        if (++putIndex == items.length)
            putIndex = 0;
        // 数组元素加1
        count++;
        // 唤醒条件notEmpty上等待的线程
        notEmpty.signal();
    }
    

    可以看到该方法用于将元素保存到数组对应的入队索引处,并且唤醒条件notEmpty对应的线程,提醒线程该队列已经不为空了(not empty)。

    这里重置入队索引为0,表示队列其实是一种循环队列,也就是环形队列,队尾不一定要是物理上的队列末尾,而是逻辑上的队尾,通过这种环形队列的用法,可以减少不必要的元素拷贝(元素出队以后,不用把元素整体往前移动)。

    3.3 put方法

    put方法是一个阻塞方法,在入队的时候,如果队列已满,会一直阻塞,直到队列可用,并且该方法在当前线程被中断时会抛出异常。

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 可中断线程
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // 阻塞notFull条件对应的线程
                notFull.await();
            // 入队方法
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    3.4 offer(E, long, TimeUnit)方法

    支持超时的offer方法,表示入队的时候,如果队列已满,则等待指定的超时时间,如果超时时间结束,队列仍然已满,则返回false;

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        checkNotNull(e);
        // 获取超时时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果队列已满,无限循环
            while (count == items.length) {
                // 如果超时(超时时间小于等于0),返回false
                if (nanos <= 0)
                    return false;
                // 没有超时,等待
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    3.5 poll方法

    出队方法poll比较简单,表示获取并移除队头元素,如果队列为空,则返回null:

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取队头元素
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    这里,如果队列不为空的话,会调用出队方法dequeue方法,该方法和入队方法enqueue恰好相反:

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 获取出队元素索引
        E x = (E) items[takeIndex];
        // 原位置设置为空
        items[takeIndex] = null;
        // 如果队列为空(如果新的出队索引==数组长度),重置出队索引
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 数组容量减1
        count--;
        // 迭代器维护
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒notFull条件上的线程
        notFull.signal();
        return x;
    }
    

    首先获取出队元素索引处的值,然后判断队列是否为空,如果队列为空,重置出队索引为0,然后队列容量减1,出队后,唤醒notFull条件上的线程,提示线程表示队列已经not full了。

    这里通过++takeIndex == items.length来判断队列是否为空,和入队中的++putIndex == items.length来判断队列是否已满,可以看出ArrayBlockingQueue其实是一种环形队列。

    3.6 take方法

    出队方法take是一个阻塞方法,用于获取并移除队列中的队头元素,如果队列为空,则会一直阻塞,直到队列中元素可用;

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果队列为空,阻塞notEmpty条件对应的线程
            while (count == 0)
                notEmpty.await();
            // 否则,出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    3.7 poll(long, TimeUnit)

    支持超时的poll方法,表示获取并移除对头的元素,如果队列为空,则等待指定的超时时间,如果超时时间结束,队列仍然为空,返回false;

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // 获取超时时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果队列为空
            while (count == 0) {
                // 超时,返回null
                if (nanos <= 0)
                    return null;
                // 没有超时,等待
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    3.8 peek方法

    peek方法表示获取队列的对头元素,但不是个出队方法,也就是不移除元素:

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果队列为空,返回null
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    final E itemAt(int i) {
        return (E) items[i];
    }
    

    从源码可以看出,peek方法直接获取数组出队索引处对应的元素。

    3.9 remove(Object)方法

    该方法表示从队列中删除指定的元素:

    public boolean remove(Object o) {
       // 如果元素是null,直接返回
       if (o == null) return false;
       final Object[] items = this.items;
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           // 如果队列中有元素
           if (count > 0) {
               // 入队索引
               final int putIndex = this.putIndex;
               // 出队索引
               int i = takeIndex;
               // 循环判断,从队头直到队尾
               do {
                   // 如果出队索引处(这里的出队索引不是原来的takeIndex,而是自定义的i,一直自增)元素
                   // 与要移除的元素相等,进行移除操作
                   if (o.equals(items[i])) {
                       // 移除元素
                       removeAt(i);
                       return true;
                   }
                   if (++i == items.length)
                       i = 0;
               } while (i != putIndex);
           }
           return false;
       } finally {
           lock.unlock();
       }
    }
    

    这里会调用removeAt方法来删除对应索引处的元素:

    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        // 如果要移除元素就是出队索引处元素,按照一般出队方法移除即可
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove
    
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                // 循环中,会从要移除的下标处,向后递增,把数据依次前移,直到队尾
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                // 判断是否到了队尾的入队索引处
                if (next != putIndex) {
                    // 往前移动元素
                    items[i] = items[next];
                    i = next;
                } else {
                    // 到了队尾索引处,表示元素移动完成,再重新设置下对尾索引
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        // 最后唤醒notFull对应线程
        notFull.signal();
    }
    

    这里进行移除操作的时候,会先从要移除的下标处开始,后面的元素依次前移,最后直到队尾的入队索引,然后将队尾的入队索引前的最后一个元素设置为null,这样就移动完成,最后的时候唤醒notFull条件对应的线程,告诉该线程,队列已经not full了。

    需要注意的是,这种删除元素的方式,和数组中删除非两端的元素一样,会移动数组中许多元素,从本质上来讲是一种缓慢且有破坏性的操作,因此官方建议我们只有在特殊情况下才进行这个操作。理想情况下,只有当已知队列不被其他线程访问时才应该这样做。

    3.10 contains方法

    contains方法用来判断队列中是否包含某元素,步骤和remove操作差不多,这里就不多说了。

    public boolean contains(Object o) {
        // 非空判断
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果队列中有元素
            if (count > 0) {
                // 入队索引
                final int putIndex = this.putIndex;
                // 出队索引
                int i = takeIndex;
                do {
                    // 循环判断,从队头到队尾
                    if (o.equals(items[i]))
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    3.11 clear方法

    clear方法表示清空队列中所有的元素,

    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取队列长度,如果队列中有元素
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 从队头开始遍历,直到队尾
                do {
                    // 将对应索引处元素设置为null
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                takeIndex = putIndex;
                count = 0;
                // 迭代器处理
                if (itrs != null)
                    itrs.queueIsEmpty();
                // 唤醒所有等待notFull条件的线程
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    

    该方法会从队列出队索引开始遍历,直到入队索引(也就是从队头到队尾),然后将对应索引处的值都设置为null,并且最后唤醒所有等待notFull条件处理的线程。

    4. 代码示例

    接下来简单看一个简单的生产者与消费者例子,例子参考自官方API文档及本文底部的链接。首先来看下生产者代码:

    /**
     * 生产者
     */
    static class Producer implements Runnable {
        private final BlockingQueue<Integer> queue;
    
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put(produce(i));
                    TimeUnit.MILLISECONDS.sleep(500);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    
        public int produce(int i) {
            System.out.println("=== put: " + i + ", thread: " + Thread.currentThread().getName());
            return i;
        }
    }
    

    然后再看下消费者代码:

    /**
     * 消费者
     */
    static class Consumer implements Runnable {
        private final BlockingQueue<Integer> queue;
    
        Consumer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    TimeUnit.MILLISECONDS.sleep(500);
                    consume(queue.take());
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    
        public void consume(int x) {
            System.out.println("--- take: " + x + ", thread: " + Thread.currentThread().getName());
        }
    }
    

    最后,进行简单测试:

    private static ExecutorService executorService = Executors.newFixedThreadPool(3);
    
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
        Producer producer = new Producer(blockingQueue);
        Consumer consumer1 = new Consumer(blockingQueue);
        Consumer consumer2 = new Consumer(blockingQueue);
        executorService.submit(producer);
        executorService.submit(consumer1);
        executorService.submit(consumer2);
    }
    

    目的很简单,就是为了测试阻塞队列,先打印的肯定是put线程的语句,然后才会打印take线程的语句,来看下运行结果:

    === put: 0, thread: pool-1-thread-1
    === put: 1, thread: pool-1-thread-1
    --- take: 0, thread: pool-1-thread-2
    --- take: 1, thread: pool-1-thread-3
    === put: 2, thread: pool-1-thread-1
    --- take: 2, thread: pool-1-thread-2
    === put: 3, thread: pool-1-thread-1
    --- take: 3, thread: pool-1-thread-3
    === put: 4, thread: pool-1-thread-1
    --- take: 4, thread: pool-1-thread-2
    === put: 5, thread: pool-1-thread-1
    --- take: 5, thread: pool-1-thread-3
    === put: 6, thread: pool-1-thread-1
    --- take: 6, thread: pool-1-thread-2
    === put: 7, thread: pool-1-thread-1
    --- take: 7, thread: pool-1-thread-3
    === put: 8, thread: pool-1-thread-1
    --- take: 8, thread: pool-1-thread-2
    === put: 9, thread: pool-1-thread-1
    --- take: 9, thread: pool-1-thread-3
    

    四、总结

    到这,ArrayBlockingQueue中大部分的方法源码都学习过了,剩余一些源码,等用到的时候再来了解下,接下来来简单总结下ArrayBlockingQueue的一些特性:

    • ArrayBlockingQueue底层通过数组来实现,容量一旦确定,无法修改;
    • ArrayBlockingQueue是通过ReentrantLock和Condition条件来保证多线程的安全访问;
    • ArrayBlockingQueue中的ReentrantLock锁有公平和非公平策略,默认是非公平的;
    • ArrayBlockingQueue不是一般的常规队列,而是一种环形队列。

    本文参考自:
    《Java并发编程实战》
    Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析 - csdn.net
    【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)

    相关文章

      网友评论

          本文标题:Java1.8-ArrayBlockingQueue源码学习(五

          本文链接:https://www.haomeiwen.com/subject/fuqhuqtx.html