美文网首页
ArrayBlockingQueue源码学习

ArrayBlockingQueue源码学习

作者: leilifengxingmw | 来源:发表于2019-03-10 12:14 被阅读0次

JDK版本:1.8

ArrayBlockingQueue的继承结构

ArrayBlockingQueue hierarchy.png

先看一下ArrayBlockingQueue类的介绍

ArrayBlockingQueue是一个由数组支持的有界阻塞队列。遵循FIFO的原则。在队列的末尾插入元素,在队列的头部取出元素。内部其实是通过循环数组来实现的。当向数组中加入数据,到达末尾的时候,则重新指向数组的头部,下一次加入数据从index=0开始;当从数组中取出数据的时候,如果到达了数组的末尾,则重新指向数组的头部,下一次取数据从index=0开始。(看看enqueue方法和dequeue方法就明白了)

ArrayBlockingQueue内部使用一个固定大小的数组来存储元素。一旦ArrayBlockingQueue被创建,则数组的容量不能改变。尝试向一个已经满的队列中插入数据会导致阻塞。尝试从一个空的队列中取出数据也会阻塞。

ArrayBlockingQueue提供一个可选的公平策略来对等待的生产者线程和消费者线程进行排序。默认情况下是不排序的。但是一个公平的队列可以保证线程按照FIFO的顺序来向队列中加入数据或者从队列中取出数据。公平策略通常会降低吞吐量,但是可以降低可变性并避免饥饿。(降低可变性啥意思?不太明白)

先看一下 中几个成员变量

   /** 存储队列中的元素 */
    final Object[] items;

    /** 下一个要被 take,poll或者remove的元素的index */
    int takeIndex;
 
    /** 下一次 put, offer, 或者 add元素的index */
    int putIndex;

    /** 队列中元素的个数*/
    int count;

    /*
     * 并发控制使用经典的双条件算法
     */

    /** 控制所有访问的锁 */
    final ReentrantLock lock;

    /** 等待取出元素的条件Condition for waiting takes */
    private final Condition notEmpty;

    /** 等待加入元素的条件 */
    private final Condition notFull;

其实到这里应该明白了,阻塞队列实现的原理就是加锁。

ArrayBlockingQueue的构造函数

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //根据传入的fair来决定是不是一个公平锁
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  
    //传入一个集合用来初始化ArrayBlockingQueue
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); 
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。

接下来看一下ArrayBlockingQueue的几个方法

ArrayBlockingQueue的put方法

 /**
 * 在队列的尾部插入元素,如果队列已满的话,会阻塞直到队列有可用空间。
 * @throws InterruptedException
 * @throws NullPointerException 
 */
public void put(E e) throws InterruptedException {
    //如果元素为空,抛出异常
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)//队列已满则进行等待
            notFull.await();//锁是可中断的,如果被中断,就抛出InterruptedException
            //注释1处,将数据加入队列
            enqueue(e);
    } finally {
        //释放锁
        lock.unlock();
    }
}

在注释1处,调用enqueue方法插入了元素。我们看一下enqueue方法。

ArrayBlockingQueue的enqueue方法,注意这是一个私有的方法!!!

/**
 * 在putIndex的位置插入元素,改变putIndex的值,然后发出队列不为空的信号。
 * 该方法只能在持有锁的情况下被调用
 */
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    //加入数据,如果到达了数组末尾,则重新指向数组的头,则下一次加入的元素从index=0开始
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //此时队列不为空,唤醒等待获取数据的线程
    notEmpty.signal();
}

ArrayBlockingQueue的offer方法

/**
 * 在队列尾部插入指定的元素,如果能够立即插入元素,返回true,如果队列已满返回false,表示插入失败。
 * 此方法通常优于add方法,因为add方法在队列已满的时候会抛出异常。
 */
public boolean offer(E e) {
    //元素为空抛出NullPointerException
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列已满,无法插入,返回false
        if (count == items.length)
            return false;
        else {
            //调用enqueue方法加入元素
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

ArrayBlockingQueue的add方法。该方法在父类AbstractQueue就实现了,内部调用offer方法实现插入操作,在插入失败的时候会抛出IllegalStateException。

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

ArrayBlockingQueue的poll方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列为空,返回null,否则返回dequeue方法的调用结果
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

ArrayBlockingQueue的dequeue方法

/**
 * 取出takeIndex位置上的元素,改变takeIndex的值,发出队列不满的信号。
 * 该方法只有在持有锁的情况下才能调用
 */
private E dequeue() {
     final Object[] items = this.items;
     //取出元素
     E x = (E) items[takeIndex];
     items[takeIndex] = null;
     //如果到达了数组的末尾,则重新指向数组的头部,下一次取元素从index=0的地方开始
     if (++takeIndex == items.length)
         takeIndex = 0;
     count--;
     if (itrs != null)
         itrs.elementDequeued();
     notFull.signal();
     return x;
}

ArrayBlockingQueue的take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//锁可以被中断
    try {
        while (count == 0)//如果队列为空,则等待
            notEmpty.await();//锁是可中断的,如果被中断,抛出InterruptedException
        return dequeue();//返回dequeue方法的调用结果
    } finally {
        lock.unlock();
    }
}

结尾:开始以为阻塞队列应该很复杂,看了看源码,也不是太复杂,真正复杂的应该是ArrayBlockingQueue的迭代器部分,就不去看了,哈哈。

参考链接:

  1. Java并发编程:阻塞队列

相关文章

网友评论

      本文标题:ArrayBlockingQueue源码学习

      本文链接:https://www.haomeiwen.com/subject/vacupqtx.html