美文网首页
读ArrayBlockingQueue源码记录

读ArrayBlockingQueue源码记录

作者: 浅笑丨无痕 | 来源:发表于2018-08-18 01:08 被阅读0次

简介

ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

探索ArrayBlockingQueue

1. 主要属性


//存储队列元素的数组,是个循环数组

final Object[] items;

//拿数据索引

int takeIndex;

//放数据索引

int putIndex;

//元素个数

int count;

//重入锁

final ReentrantLock lock;

//条件对象

private final Condition notEmpty;

private final Condition notFull;

从属性可以看到,ArrayBlockingQueue的内部采用数组进行存储,采用重入锁(ReentrantLock lock)保证线程安全,利用条件对象Condition实现可阻塞式的出入队列。

2. 初始化


public ArrayBlockingQueue(int capacity) {

    this(capacity, false);

}

public ArrayBlockingQueue(int capacity, boolean fair) {

    if (capacity <= 0)

        throw new IllegalArgumentException();

    this.items = new Object[capacity];//初始化数组长度

    lock = new ReentrantLock(fair); //创建重入锁

    notEmpty = lock.newCondition(); //由lock创建条件对象

    notFull =  lock.newCondition();

}

构造函数还是很容易理解的,初始化数组,创建重入锁。其中fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。

3.插入

ArrayBlockingQueue的插入方法有add、offer、put.

add()方法,可以从源码看到实际调用的是offer方法。


public boolean add(E e) {

    return super.add(e); //调用父类的add()

}

//父类的方法

public boolean add(E e) {

    if (offer(e))

        return true;

    else

        throw new IllegalStateException("Queue full");

}

再看offer方法:作用是将元素e插入到阻塞队列的尾部,如果队满,返回false,即插入失败。否则,插入元素,这里调用的是私有方法insert();


public boolean offer(E e) {

    // 创建插入的元素是否为null,是的话抛出NullPointerException异常

    checkNotNull(e);

    // 加锁,保证线程安全

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        //队列满,返回false

        if (count == items.length)

            return false;

        else {

            //插入方法

            insert(e);

            return true;

        }

    } finally {

        lock.unlock();

    }

}

接着看insert()源码,在插入元素后唤醒notEmpty的等待线程


private void insert(E x) {

    //将x放入数组中

    items[putIndex] = x;

    //设置下一个放入索引

    putIndex = inc(putIndex);

    //元素数量+1

    ++count;

    //唤醒notEmpty的等待线程

    notEmpty.signal();

}

inc()方法:若i+1的值等于“队列的长度”,即添加元素之后,队列满;则设置“下一个被添加元素的索引”为0。


final int inc(int i) {

    return (++i == items.length) ? 0 : i;

}

offer()还有另一个重载方法.这个重载方法多了两个参数,实现的是超时退出;如果队列为满,会使得当前线程进入等待状态等待一定的时长,等待期间如果队列不为满了就会被唤醒,然后元素添加成功,如果超过了参数设置的时限队列仍为满,元素就会添加失败,返回false。


public boolean offer(E e, long timeout, TimeUnit unit)

    throws InterruptedException {

    checkNotNull(e);

    long nanos = unit.toNanos(timeout);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly(); //允许线程中断

    try {

        while (count == items.length) {

            if (nanos <= 0)

                return false;

            nanos = notFull.awaitNanos(nanos);//等待并且设置最大等待时间

        }

        insert(e);

        return true;

    } finally {

        lock.unlock();

    }

}

put()方法:与可延迟的offer()相似,唯一不同的是没有设置超时等待时间,即当队列为空时,线程一直等待下去。


public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length)

            notFull.await();

        insert(e);

    } finally {

        lock.unlock();

    }

}

4.取出

ArrayBlockingQueue有三个取出元素的方法,分别为poll(),take().

poll()方法:可以看到poll()方法实际调用extract()方法。


public E poll() {

    // 加锁,保证线程安全

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        // 队列为空返回null,否则调用extract方法

        return (count == 0) ? null : extract();

    } finally {

        lock.unlock();

    }

}

再看extract()方法:方法也很简单,获取取位置上的元素后将其位置的设置为空,并唤醒notFull上的等待线程


private E extract() {

    final Object[] items = this.items;

    //获取取索引上的元素,强制将元素转换为“泛型E”

    E x = this.<E>cast(items[takeIndex]);

    //将取索引上的位置设置为null,即删除

    items[takeIndex] = null;

    //设置下一个取的位置

    takeIndex = inc(takeIndex);

    //队列长度减一

    --count;

    // 唤醒notFull上的等待线程。

    notFull.signal();

    return x;

}

同时,poll()也有一个重载方法poll(long timeout, TimeUnit unit) ,该方法指定等待时间,如果队列无元素,会使得当前线程进入等待状态等待一定的时长,等待期间如果队列数量不为空了就会被唤醒,获取元素成功,如果超过了参数设置的时限队列仍为空,返回false。


public E poll(long timeout, TimeUnit unit) throws InterruptedException {

    long nanos = unit.toNanos(timeout);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();//设置线程可中断

    try {

        while (count == 0) {

            if (nanos <= 0)

                return null;

            nanos = notEmpty.awaitNanos(nanos);//指定时间等待

        }

        return extract();

    } finally {

        lock.unlock();

    }

}

take()方法与等待poll()相似,唯一不同就是没有设置超时时间,即如果队列为空,会无限等待,除非线程被中断。


public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == 0)

            notEmpty.await();

        return extract();

    } finally {

        lock.unlock();

    }

}

5.其他方法

ArrayBlockingQueue还提供其他方法,例如size()获取当前队列长度,该方法与ConcurrentHashMap的size()不同,ArrayBlockingQueue的size()方法不需要进行多次对比判断是否改变,所以性能比ConCurrentHashMap的size()效率高.


public int size() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        return count;

    } finally {

        lock.unlock();

    }

}

clear()方法清空队列,remove(object)移除指定元素等,这里不再进行延伸。

相关文章

网友评论

      本文标题:读ArrayBlockingQueue源码记录

      本文链接:https://www.haomeiwen.com/subject/doexiftx.html