美文网首页
[并发集合] ArrayBlockingQueue源码分析

[并发集合] ArrayBlockingQueue源码分析

作者: LZhan | 来源:发表于2020-01-03 15:21 被阅读0次

转自 公众号 【彤哥说源码】https://mp.weixin.qq.com/s/EN7qY1w4e8C0ZXiP7i82TA

1 前言

ArrayBlockingQueue,是java并发包下的一个以数组实现的阻塞队列,它是线程安全的。

2 源码分析

2.1 重要属性

    /** The queued items */
    // 使用数组存储元素
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    // 取元素的指针
    int takeIndex;

    /** items index for next put, offer, or add */
    // 放元素的指针
    int putIndex;

    /** Number of elements in the queue */
    // 元素数量
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    // 保证并发访问的锁
    final ReentrantLock lock;

    /** Condition for waiting takes */
    // 非空条件
    private final Condition notEmpty;

    /** Condition for waiting puts */
    // 非满条件
    private final Condition notFull;

通过以上可得,
(1)ArrayBlockingQueue是利用数组来存储元素
(2)通过放指针和取指针来标记下一次操作的位置
(3)利用重入锁来保证并发安全

2.2 构造方法

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化数组
        this.items = new Object[capacity];
        // 创建重入锁以及两个条件
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

在构造时,必须传入容量capacity,也就是数组的大小;
通过构造方法控制重入锁的类型是公平锁还是非公平锁

2.3 入队

入队有4个方法,分别是add(E e),offer(E e),put(E e),offer(E e, long timeout, TimeUnit unit)

    public boolean add(E e) {
        // 调用父类的add(e)方法
        return super.add(e);
    }

    public boolean add(E e) {
        // 调用ArrayBlockingQueue的offer方法,成功返回true
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // count是队列总的元素
            // items是存储数组的长度
            if (count == items.length)
                return false;
            else {
                // 调用enqueue方法
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁,如果线程中断了抛出异常
        lock.lockInterruptibly();
        try {
            // 如果数组满了,使用notFull等待
            // notFull等待的意思就是说现在队列满了
            // 只有取走一个元素之后,队列才不满,然后唤醒notFull,继续现在的逻辑  
            // 这里用while而不是if,是因为有可能多个线程阻塞在lock上
            // 即使唤醒了可能其它线程先一步修改了队列又变成满的了,需要再次等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果数组满了,就阻塞nanos纳秒
            // 如果唤醒这个线程时依然没有空间且时间到了就返回false
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

enqueue方法如下:

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 将元素直接放到放指针的位置上
        items[putIndex] = x;
        // 如果放指针到数组尽头了,就返回头部
        if (++putIndex == items.length)
            putIndex = 0;
        // 数量加1
        count++;
        //唤醒notEmpty,因为入队了一个元素,所以肯定不为空
        notEmpty.signal();
    }

所以,
(1)add(e)时如果队列满了,就抛出异常
(2)offer(e)时如果对列满了,就返回false
(3)put(e)时如果队列满了则使用notFull等待
(4)offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
(5)利用放指针循环使用数组来存储元素;

2.4 出队

出队有4个方法,分别是remove()、poll()、take()、poll(long timeout, TimeUnit unit)

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果队列没有元素,则返回null,否则出队
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
           // 如果队列无元素,则阻塞等待nanos纳秒,并释放锁
           // 如果下一次这个线程获得了锁但是队列依然没有元素且已超时,返回null
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 尝试获取锁,如果此时锁被其他线程锁占用,那么当前线程就处于Waiting状态
        lock.lockInterruptibly();
        try {
            // 如果此时队列中的元素个数为0,那么就让当前线程wait,并且释放锁
            while (count == 0)
                notEmpty.await();
            // 如果队列不为空,则从队列的头部获取元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

dequeue方法如下:

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 根据taskIndex获取元素,因为元素是一个Object类型的数组,因此它通过cast方法将其转换成泛型
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // 将taskIndex进行++操作,达到循环
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒其他等待的线程
        notFull.signal();
        return x;
    }

(1)poll()时如果队列为空则返回null
(2)take()时如果队列为空则阻塞在等待条件notEmpty上
(3)poll(timeout,unit)时,如果队列为空则阻塞等待一段时间后,就返回null
(4)利用取指针循环从数组中获取元素

3 总结

(1)ArrayBlockingQueue不需要扩容,因为初始化时指定容量,并且循环利用数组;
(2)ArrayBlockingQueue利用taskIndex和putIndex循环利用数组;
(3)入队和出队各定义了四组方法为满足不同的用途;
(4)利用重入锁和两个条件保证并发安全;

面试题目:
(1)论BlockingQueue中的那些方法?

(2)ArrayBlockingQueue有哪些优缺点?
a) 队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;
b) 如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;
c) 只使用了一个锁来控制入队出队,效率较低,那是不是可以借助分段的思想把入队出队分裂成两个锁呢?且听下回分解。

相关文章

网友评论

      本文标题:[并发集合] ArrayBlockingQueue源码分析

      本文链接:https://www.haomeiwen.com/subject/sgyzoctx.html