美文网首页
BlockingQueue详解

BlockingQueue详解

作者: 壹元伍角叁分 | 来源:发表于2022-04-01 16:38 被阅读0次

在Java中,BlockingQueue是一个接口,它的实现类有 ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、 LinkedBlockingQueue、PriorityBlockingQueue、 SynchronousQueue等,它们的区别主要体现在存储结构上或对元素 操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

  • 入队

    offer(E e):如果队列没满,立即返回true;如果队列满了,立即返回 false --> 不阻塞

    put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中 断 --> 阻塞

  • 出队

    poll():如果没有元素,直接返回null;如果有元素,出队 --> 不阻塞

    take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断 --> 阻塞

一、LinkedBlockingQueue

LinkedBlockingQueue可以指定容量,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。

1、底层数据结构

LinkedBlockingQueue内部是使用链表实现一个队列的,但是有别于一般的队列,在于该队列至少是有一个节点的,头节点不含有元素。如果队列为空时,头节点的next参数为null。尾节点的next参数也为null。

2、主要变量

// 容量限制,如果没有指定,则为 Integer.MAX_VALUE
private final int capacity;

// 当前队列中的元素数量
private final AtomicInteger count = new AtomicInteger();

// 队列头节点,始终满足head.item == null
transient Node<E> head;

// 队列的尾节点,始终满足last.next == null
private transient Node<E> last;

// 由 take、poll 等持有的锁
private final ReentrantLock takeLock = new ReentrantLock();

// 当队列为空时,保存执行出队的线程
private final Condition notEmpty = takeLock.newCondition();

// 由 put、offer 等持有的锁
private final ReentrantLock putLock = new ReentrantLock();

// 当队列满时,保存执行入队的线程
private final Condition notFull = putLock.newCondition();

3、put(E e)方法

put(E e)方法用于将一个元素插入到队列的尾部。在队列满时会阻塞,直到队列中有元素被取出。

// 在此队列的尾部插入指定元素,如有必要,等待空间可用。
public void put(E e) throws InterruptedException {
    // 不允许元素为null
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    // 以当前元素新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获得入队的锁
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,那么将该线程加入到Condition的等待队列中
        while (count.get() == capacity) {
            notFull.await();
        }
        // 当队列未满,然后有出队线程取出导致,将节点入队
        enqueue(node);
        // 得到插入之前队列的元素个数。getAndIncrement返回的是 +1 前的值
        c = count.getAndIncrement();
        // 如果还可以插入元素,那么释放等待的入队线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放入队的锁
        putLock.unlock();
    }
    // 如果插入队列之前元素个数为0,插入后就通知出队线程队列非空
    if (c == 0)
        signalNotEmpty();
}

put方法总结:

1、LinkedBlockingQueue不允许插入的元素为null;

2、同一时刻只有一个线程可以进行入队操作,putLock在将元素插入队列尾部时加锁了;

3、如果队列满了,则会调用notFull.await(),将该线程加入到Condition等待队列中。await方法会释放线程占有的锁,这将导致之前由于被阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为队列仍旧是满的,也被进入到条件队列中;

4、一旦有出队线程取走元素,就会通知到入队等待队列释放线程。那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部;

5、然后得到插入队列前元素的个数,如果插入后队列中还可以继续插入元素,那么就通知notFull条件的等待队列中的线程;

6、如果插入队列前个数为0,那现在插入后,就为1了,那就可以通知因为队列为空而导致阻塞的出队线程去取元素了。

4、E take()方法

take()方法用于得到队头的元素,在队列为空时会阻塞,直到队列中有元素可取。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takeLock锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列中元素数量为0,则将出队线程加入到notEmpty队列中进行等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 得到到队头的元素
        x = dequeue();
        // 得到出队列前元素的个数。getAndDecrement返回的是 -1 前的值
        c = count.getAndDecrement();
        // 如果出队列前的元素数量大于1,那说明还可以继续取,那就释放在notEmpty队列的第一个线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放出队锁
        takeLock.unlock();
    }
    // 如果出队前队列是满的,那现在取走一个元素了,队列就不满了,就可以去通知等待中的入队线程了。
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法总结:

1、同一时刻只有一个线程可以进行出队操作,takeLock在出队之前加锁了;

2、如果队列中元素为空,那就进入notEmpty队列中进行等待。直到队列不为空时,得到队列中的第一个元素。当发现取完发现还有元素可取时,再通知一下notEmpty队列中等待的其他线程。最后判断自己取元素前的是不是满的,如果是满的,那自己取完,就不满了,就可以通知在notFull队列中等待插入的线程进行put了。

5、remove()方法

remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回 false ;有的话则删除并返回true。入队和出队都是只获取一个锁,而 remove()方法需要同时获得两把锁,

public boolean remove(Object o) {
    // 因为队列不包含null元素,返回false
    if (o == null) return false;
    // 获取两把锁
    fullyLock();
    try {
        // 从头的下一个节点开始遍历
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果匹配,那么将节点从队列中移除,trail表示需要删除节点的前一节点
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 释放两把锁
        fullyUnlock();
    }
}
/**
 * 锁定以防止 put 和 take.
 */
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

/**
 * 解锁以允许 put 和 take.
 */
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

6、总结

LinkedBlockingQueue允许两个线程同时在两端进行入队和出队操作,但一端同时只能有一个线程进行操作,是通过两个锁进行区分的。

为了维护底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(进行+1操作),另一个是出队的方法(进行-1操作),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。

二、ArrayBlockingQueue

// 构造方法
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

// 创建具有给定(固定)容量和指定访问策略的ArrayBlockingQueue 。
// 参数:capacity —— 这个队列的容量
        fair     —— 如果为true ,则在插入或删除时阻塞的线程的队列访问将按 FIFO 顺序处理;如果为false ,则未指定访问顺序。
public ArrayBlockingQueue(int capacity, boolean fair) {
    //  如果capacity < 1,抛出异常
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

1、底层数据结构

ArrayBlockingQueue内部是使用数组实现一个队列的,并且在构造方法中就需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此在队列满的时候执行入队会阻塞,在队列为空时出队也会阻塞。

2、主要变量

// 元素数组,其长度在构造方法中指定
final Object[] items;

// 队列中实际的元素数量
int count;

// 保护所有通道的主锁
final ReentrantLock lock;

// 等待take条件的队列
private final Condition notEmpty;

// 等待put条件的队列
private final Condition notFull;

3、put(E e)

在此队列的尾部插入指定元素,如果队列已满,则等待空间可用

public void put(E e) throws InterruptedException {
    // 检查元素是否为null,如果是,抛出NullPointerException
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队里中的元素数量等于数组长度,则队列已满,阻塞,等待队列成为不满状态
        while (count == items.length)
            notFull.await();
        // 将元素入队
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}

put方法总结:

1、ArrayBlockingQueue不允许添加null元素;

2、ArrayBlockingQueue在队列已满的时候,会调用notFull.await(),释放锁并处于阻塞状态;

3、一旦ArrayBlockingQueue在队列不满的时候,就立即入队。

4、E take()

取出此队列的头部元素,如果队列空,则阻塞,等待元素可取。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 当队列中元素数量为0时,则进入阻塞状态
        while (count == 0)
            notEmpty.await();
        // 队列不为空是,调用dequeue()出队
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

take方法总结:

1、取元素时,一旦获得锁,队列为空, 则会阻塞,直至不为空,调用dequeue()出队。

5、总结

ArrayBlockingQueue是一个底层结构是数组的阻塞队列,是通过 ReentrantLockCondition 来实现的。不可插入为null的元素,入队和出队使用的是同一个锁。意味着同一时刻只能有一个线程能进行入队或者出队操作。入队时,队列已满则会调用notFull.await(),进入阻塞状态。直到队列不满时,再进行入队操作。当出队时,队列为空,则调用notEmpty.await(),进入阻塞状态,直到队列不为空时,则出队。

三、LinkedBlockingQueue和ArrayBlockingQueue的区别

1、底层实现不同

LinkedBlockingQueue底层实现是链表,ArrayBlockingQueue底层实现是数组

2、队列容量

LinkedBlockingQueue默认的队列长度是Integer.Max,但是可以指定容量。在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多;

ArrayBlockingQueue必须在构造方法中指定队列长度,不可变。在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高。

3、锁的数量

LinkedBlockingQueue有两把锁,可以有两个线程同时进行入队和出队操作,但同时只能有一个线程进行入队或出队操作。

ArrayBlockingQueue只有一把锁,同时只能有一个线程进行入队和出队操作。

相关文章

网友评论

      本文标题:BlockingQueue详解

      本文链接:https://www.haomeiwen.com/subject/isvajrtx.html