美文网首页
JUC ArrayBlockingQueue

JUC ArrayBlockingQueue

作者: 爱情小傻蛋 | 来源:发表于2019-08-24 14:32 被阅读0次

java.util.concurrent.ArrayBlockingQueue 是一个线程安全的、基于数组、有界的、阻塞的、FIFO 队列。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类基于 java.util.concurrent.locks.ReentrantLock 来实现线程安全,所以提供了 ReentrantLock 所能支持的公平性选择。

ArrayBlockingQueue类图结构

如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。另外有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,另外notEmpty,notFull条件变量用来进行出入队的同步。

另外构造函数必须传入队列大小参数,所以为有界队列,默认是Lock为非公平锁。

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
  }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

属性

队列的操作主要有读、写,所以用了两个 int 类型的属性作为下一个读写位置的的指针。存放元素的数组是 final 修饰的,所以数组的大小是固定的。对于并发控制,是所有的访问都必须加锁,并用两个条件对象用于协调读写操作。

// 队列存放元素的容器
final Object[] items;

// 下一次读取或移除的位置
int takeIndex;

// 存放下一个放入元素的位置
int putIndex;

// 队列里有效元素的数量
int count;

// 所有访问的保护锁
final ReentrantLock lock;

// 等待获取的条件
private final Condition notEmpty;

// 等待放入的条件
private final Condition notFull;

环绕处理

如果指针一直往前增加或一直往后减小,那么总会超出数组的有效索引范围。所以需要进行一些环绕处理。

// 指针前移
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

// 指针后移
final int dec(int i) {
    return ((i == 0) ? items.length : i) - 1;
}

注意,上面的处理都是对指针值的直接处理,而不关心是读指针还是写指针,因为是否有可读元素、可写空间的判断是通过对 count 计数来判断的。

这也是 count 的作用,它极大地简化了指针有效性的判断。在下面的 insert 和 extract 方法中根本就不需要对读写指针之间的位置关系进行判断,非常精妙。

通过环绕处理可以把这个数组看成是圆形的缓存。

添加元素

所有添加操作最终都是调用到内部方法 insert。

// 在持有锁的前提下调用
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex); // 指针前移 1
    ++count; // 有效元素数量加 1
    notEmpty.signal(); // 通知在非空条件上等待的读线程
}
offer操作

在队尾插入元素,如果队列满则返回false,否者入队返回true。

public boolean offer(E e) {

    //e为null,则抛出NullPointerException异常
    checkNotNull(e);

    //获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列满则返回false
        if (count == items.length)
            return false;
        else {
            //否者插入元素
            insert(e);
            return true;
        }
    } finally {
        //释放锁
        lock.unlock();
    }
}

这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。

另外这个队列是使用循环数组实现,所以计算下一个元素存放下标时候有些特殊。另外insert后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。

put操作

在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;

//获取可被中断锁
lock.lockInterruptibly();
try {

    //如果队列满,则把当前线程放入notFull管理的条件队列
    while (count == items.length)
        notFull.await();

    //插入元素
    insert(e);
} finally {
    lock.unlock();
}

}
需要注意的是如果队列满了那么当前线程会阻塞,知道出队操作调用了notFull.signal方法激活该线程。

代码逻辑很简单,但是这里需要思考一个问题为啥调用lockInterruptibly方法而不是Lock方法。我的理解是因为调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出,所以还不如在加锁时候先看中断标志是不是被设置了,如果设置了直接抛出InterruptedException异常,就不用再去获取锁了。然后看了其他并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock也验证了这个想法。

移除元素

所有读取操作最终都是调用到内部方法 extract。

// 在持有锁的前提下调用
private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null; // for GC,避免内存泄露;也用于判断元素是否被移除
    takeIndex = inc(takeIndex); // 指针前移 1
    --count; // 有效元素数量减 1
    notFull.signal(); // 通知在非满条件上等待的写线程
    return x;
}
poll操作

从队头获取并移除元素,队列为空,则返回null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //当前队列为空则返回null,否者
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}
take操作

从队头获取元素,如果队列为空则阻塞直到队列有元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {

        //队列为空,则等待,直到队列有元素
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}

需要注意的是如果队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,await才会返回。

移除指定位置元素

// 在持有锁的前提下调用
void removeAt(int i) {
    final Object[] items = this.items;
    // 如果要移除是元素就是下一个可读数据,直接移除、修改读指针即可。
    // 这是一种优化,避免数据拷贝。
    if (i == takeIndex) {
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
    } else {
      // 如果要移除元素是在有效数据的中间,那么要把它之后添加的元素后移
      // 注意:这里不能用读写指针的大小关系作为终结条件,也是因为环绕。
        for (;;) {
            int nexti = inc(i);
            if (nexti != putIndex) {
                items[i] = items[nexti];
                i = nexti;
            } else {
                items[i] = null; // for GC
                putIndex = i; // putIndex 不是直接减 1 还是因为环绕。
                break;
            }
        }
    }
    --count;
    notFull.signal(); //
}

获取元素

peek操作

返回队列头元素但不移除该元素,队列为空,返回null

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列为空返回null,否者返回头元素
        return (count == 0) ? null : itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return this.<E>cast(items[i]);
}

获取长度

size操作

获取队列元素个数,非常精确因为计算size时候加了独占锁,其他线程不能入队或者出队或者删除元素。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

总结

ArrayBlockingQueue通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加synchronized的意味。其中offer,poll操作通过简单的加锁进行入队出队操作,而put,take则使用了条件变量实现如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。

相关文章

网友评论

      本文标题:JUC ArrayBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/bmnzsctx.html