J.U.C 阻塞队列源码剖析系列(二)之 ArrayBlocki

作者: 爱打乒乓的程序员 | 来源:发表于2020-02-17 09:51 被阅读0次

这一篇文章开始,将重点学习 ArrayBlockingQueue 的源码,并在啃 ArrayBlockingQueue 源码中,学习API设计者的思想,对于咱们写代码的能力也会有帮助!

介绍:

ArrayBlockingQueue 从字面意思上不难看出,这是一个基于数组的阻塞队列。那么在学习其源码之前,先提出一些疑问:

1.ArrayBlockingQueue 有哪些特性?
2.数组索引是怎样得出的?

带着这些疑问往下看吧~

先从 ArrayBlockingQueue 的 Demo 示例作为看源码的入口。

public class ArrayBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 初始化 ArrayBlockingQueue 对象,必须指定数组的容量,公平锁策略默认是非公平锁
        ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        for (int i = 1; i < 20; i++) {
//            queue.add(i);// 不能继续插入元素时会抛异常
            queue.offer(i);// 当不能继续插入元素时不会抛异常,而是返回false跳出offer方法的执行
        }
        // peek方法只会读取数组第一个元素
        System.out.println(queue.peek());
        System.out.println("执行peek()方法后,数组的长度为:" + queue.size());
        // poll方法会读取并删除数组第一个元素
        System.out.println(queue.poll());
        System.out.println("执行poll()方法后,数组的长度为:" + queue.size());
        // 往数组塞一个元素,当在不可执行的时候会阻塞线程,直到可以被其它线程唤醒
        queue.put(100);
        System.out.println("执行put()方法后,数组的长度为:" + queue.size());
        // take方法会获取数组第一个元素,当在不可执行的时候会阻塞当前线程,直到可以被其它线程唤醒
        System.out.println(queue.take());
        System.out.println("执行take()方法后,数组的长度为:" + queue.size());
        System.out.println("队列存在“5”这个元素:" + queue.contains(5));
        System.out.println("队列存在“5”这个元素并删除:" + queue.remove(5));
        System.out.println("队列存在“5”这个元素:" + queue.contains(5));
        queue.clear();
        System.out.println("执行clear()方法后,数组的长度为:"  + queue.size());
    }
}

运行结果:

1
执行peek()方法后,数组的长度为:10
1
执行poll()方法后,数组的长度为:9
执行put()方法后,数组的长度为:10
2
执行take()方法后,数组的长度为:9
队列存在“5”这个元素true
队列存在“5”这个元素并删除:true
队列存在“5”这个元素:false
执行clear()方法后,数组的长度为:0

示例中,在关键的方法加上了注释,如果对运行的结果感到不解,那就带着疑问看继续往下看,然后再回过头思考一下,我相信你一定会豁然开朗的!譬如源码中多次出现 takeIndex 和 putIndex 这两个变量,一开始你可能不清楚有什么作用,但你看完这篇源码剖析和总结,你就知道这两个变量是多么妙!不仅提高性能,还减少代码量!


源码剖析

在学习任何一个技术之前,先看下其官方介绍,这样会对这项技术有一个基本的了解。那么 ArrayBlockingQueue 的官方介绍在哪里呢?其实就是在 ArrayBlockingQueue 类的注释呀!(废话!hhhhh)

简单的翻译下 ArrayBlockingQueue 类的注释可以得出以下信息:

  • 有界的阻塞数组,数组大小一旦创建,则不能再改变(数组不会动态扩容)
  • 数组的读写顺序是先进先出(FIFO)
  • 当队列满的情况下,put操作会被阻塞;当队列为空的时候,take操作会被阻塞
  • 等待策略可以选择公平或非公平行为(公平行为:数组元素是顺序的新增;非公平行为:数组元素新增的顺序是随机的)
成员变量
    // 使用数组保存的元素
    final Object[] items;
    // 下次读取元素的索引
    int takeIndex;
    // 下次添加元素的索引
    int putIndex;
    // 当前队列中元素的个数
    int count;
    // 可重入锁
    final ReentrantLock lock;
    // 等待获取元素的锁
    private final Condition notEmpty;
    // 等待添加元素的锁
    private final Condition notFull;
    // 迭代器状态
    transient Itrs itrs = null;
ArrayBlockingQueue 构造方法
    // 根据参数初始化数组容量,默认为非公平锁
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 根据参数初始化数组容量和设置是否是公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    // 使用一个 Collection 来作为队列的默认元素
    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue 只有三个构造方法,但都要在创建对象的时候指定数组容量,还可以根据需求设置 ReentrantLock 是否为公平锁。【对 ReentrantLock 不熟悉的朋友,可以参考我相关的拙作:https://www.jianshu.com/p/842aa75f77b7

返回数组元素个数:size()
    // 返回数组元素个数
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
新增元素:add(E e)、offer(E e) 和 put(E e)
    // 实际上是调用父类(AbstractQueue)的add方法,AbstractQueue.add(E e)实际上是调用子类offer插入元素
    public boolean add(E e) {
        return super.add(e);
    }
    // AbstractQueue 抽象类的add方法
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    // 添加元素,如果队列满的话不会阻塞,直接返回false
    public boolean offer(E e) {
        // 判断元素是否为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            // 如果数组已经满了,返回false
            if (count == items.length)
                return false;
            else {
                // 调用 enqueue 插入数据并返回true
                enqueue(e);
                return true;
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    // 只有当加锁成功才会调用该方法
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // 如果数组索引达到数组的阀值,重新将 putIndex 重置为 0
        if (++putIndex == items.length)
            putIndex = 0;
        //入列成功,当前队列元素数量自增
        count++;
        // 因为已经有数据了,就可以唤醒 notEmpty Condition了
        notEmpty.signal();
    }
    // 添加元素,队列满的话会阻塞
    public void put(E e) throws InterruptedException {
        // 检查元素是否为null
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁,锁定线程,除非当前线程是interrupted
        lock.lockInterruptibly();
        try {
            // 判断数组是否已满
            while (count == items.length)
                notFull.await();// 数组已满,线程等待
            // 将指定元素添加到队列尾部
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

从源码中可以知道,使用 add 方法的时候实际上是调用 offer 方法,如果队列满的情况下,add 方法直接抛出Queue full异常;而使用 offer 方法通过可重入锁(ReentrantLock)保证线程安全,当队列满的情况下会直接返回 false;put 方法则会在队列满的情景下阻塞线程,直到队列数组有位置。

值得一说的是,enqueue 方法中,当队列数组满的时候,会将 putIndex 设置为 0。这是什么意思呢?这就是上面提到的 takeIndex 和 putIndex 妙处。实际上,你可以认为 ArrayBlockingQueue 的数组是一个环形数组(这不禁让我想起高性能并发框架 Disruptor 的核心 RingBuffer,可以参考我另外一篇文章对 Disruptor 的介绍:https://www.jianshu.com/p/0f35c3866780)。

参考上图,假设队列数组的长度为 8,当队列数组索引 0~7 位置都塞满数据的情景下,设置 putIndex 为 0,意味着下一次添加数组索引位置是 0,当执行 take 方法的时候会从数组头部,也就是索引为 0 的位置开始读取删除数据,那么现在索引位置 1~7 都是数据,而索引为 0 的位置是不是空啦?所以当执行 offer 方法或 put 方法的时候就会将数据新增到索引为 0 的位置上。

读取元素:peek() 和 poll() 和 take()
    // 读取队列元素,但不删除
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取索引位置为 takeIndex 的元素
            return itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
    @SuppressWarnings("unchecked")
    final E itemAt(int i) {
        // 根据索引从队列数组中获取元素
        return (E) items[i];
    }
    // 读取队列元素并删除
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果队列中元素的个数为0,返回null;否则执行dequeue方法
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 获取队列数组索引为 takeIndex 的元素
        E x = (E) items[takeIndex];
        // 把数组索引为 takeIndex 的元素设置为 null
        items[takeIndex] = null;
        // 下一次取元素的索引值累加1
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 出列成功,数组元素数量 -1
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
    
    // 读取队列元素并删除,如果队列空的话会阻塞
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 判断当前数组元素个数是否为 0
            while (count == 0)
                notEmpty.await();// 如果数组为空则当前线程阻塞,直到其它线程唤醒才继续执行
            // 读取数组元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

读取队列数组元素是根据 takeIndex 作为索引获取的,与新增方法类似,当读取数组元素的索引值到了数组的末端,则会从头开始读取,也就是 dequeue 方法的 takeIndex = 0

删除数组元素:remove(Object o)
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 循环数组元素,找到要移除的元素调用removeAt()方法并返回true,如果循环到最后没找到元素则跳出循环返回false
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    void removeAt(final int removeIndex) {
        final Object[] items = this.items;
        // 如果移除的元素就是索引位置为 takeIndex 的元素
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            final int putIndex = this.putIndex;
            // 循环队列,查找到元素,再移除元素
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            // 数组数量减1
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        // 唤醒 notFull
        notFull.signal();
    }
查找包含指定的元素:contains(Object o)
    public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 判断数组是否有元素,如果没有直接返回false
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 循环数组元素,如果找到对应的元素返回true,若一直循环到插入索引的位置都没找到元素就跳出循环返回false
                do {
                    if (o.equals(items[i]))
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
清空链表所有的节点:clear()
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 遍历数组元素,都设置为null,便于GC回收
                do {
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                takeIndex = putIndex;
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                // 唤醒等待线程,防止其它线程一直阻塞,损耗CPU的资源
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }

通过以上 ArrayBlockingQueue 源码的剖析,相信你应该不难回答文章开头的问题啦。

总结:

通过上面的源码不难看出,ArrayBlockingQueue 的核心就是使用 ReentrantLock 控制队列的访问,通过两个 Condition 控制线程之间的通信;takeIndexputIndex 两个变量分别指定队列入队和出队的位置,当 takeIndex 或 putIndex 达到边界则重置为0,相当于 ArrayBlockingQueue 数组是一个环形数组。环形数组比常见的顺序数组有什么好处?通过上面的增删元素的源码分析,你有发现需要移动数组的元素吗?如果是顺序数组的话,还要维护数组索引的位置,当增删元素的时候要移动大量的元素,性能差!而环形数组在增删元素的时候只需要改变入队和出队的位置就行,大大提高性能!

下一篇为:J.U.C 阻塞队列源码剖析系列(三)之 LinkedBlockingQueue

如果觉得源码剖析不错的话,麻烦点个赞哈!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~

相关文章

网友评论

    本文标题:J.U.C 阻塞队列源码剖析系列(二)之 ArrayBlocki

    本文链接:https://www.haomeiwen.com/subject/orbixhtx.html