ArrayBlockingQueue源码分析

作者: 蓝梅 | 来源:发表于2021-06-13 00:12 被阅读0次

今天看一看BlockingQueue阻塞队列,怎么来实现阻塞的;

一、阻塞队列体系图

image.png

二、主要属性

//元素数组
final Object[] items;
 // 取元素的指针
int takeIndex;
// 放元素的指针
int putIndex; 
// 元素数量
int count; 
//集合锁
final ReentrantLock lock;
//非空条件等待队列
private final Condition notEmpty;
//未满条件等待队列
private final Condition notFull;

从上面主要属性我们大概可以看出,入队元素是放入数组中的,然后由取元素的指针指向队列头元素,放元素的指针指向队列尾元素;由锁来保证取放数据时的并发安全

三、源码分析

我们来跟着源码分析一遍,主要是看注释

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //初始化数组,容量为capacity
        this.items = new Object[capacity];
        //初始化锁,true为公平锁,false为非公平锁
        lock = new ReentrantLock(fair);
        //初始化两个条件队列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

构造方法先初始化了一些主要元素,我们再看看put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //入队时,先加锁,如果被打断会抛异常出来
        lock.lockInterruptibly();
        try {
            /*
             *当队列已经放满时,则把当前线程放入,未满条件等待队列,挂起线程
             *当线程被唤醒时,则再去看队列是否放满
             *未放满,则元素入队,放满则再继续挂起
             */
            while (count == items.length)
                notFull.await();
            //当队列未满时,则把元素入队,
            enqueue(e);
        } finally {
            //添加完成释放锁
            lock.unlock();
        }
    }

我们再看看take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //出队时,先加锁
        lock.lockInterruptibly();
        try {
            /*
             *当队列为空时,则把该线程放入非空条件等待队列,挂起该线程
             *当线程被唤醒时,则再去看队列是否非空
             *有值,则出队,没有值,则再次挂起该线程
             */
            while (count == 0)
                notEmpty.await();
            //出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

到此,则主要逻辑分析完了,然后我们继续来看看,条件队列是怎么来挂起该线程的,本来应该是在上一节ReentrantLock源码分析中去讲该逻辑的,但是上次主要是讲ReentrantLock主流程,这次遇到了条件锁,那就正好来分析一遍;我们先来看看lock.newCondition() new出来的对象ConditionObject主要元素;

//条件队列的头节点
private transient Node firstWaiter;
//条件队列的尾节点
private transient Node lastWaiter;

再来看看调用的await()方法

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //创建该条件队列的节点
        Node node = addConditionWaiter();
        //该方法很明显就是ReentrantLock释放锁的逻辑,这里就不分析了
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判断该节点是否在sync队列中,不在则挂起该线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //唤醒后,会去修改waitStatus属性,然后把条件队列的节点,移入sync队列
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        /*当该节点被唤醒时,这个可以看下后面signal方法后再回到这里
         *因为唤醒后,该节点会被移入sync队列,所以才会跳出上面的循环
         *acquireQueued这个方法正好也是ReentrantLock抢占锁资源的方法,这里就不分析了
         */
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            //断开所有条件队列的指针,清空条件队列
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        //如果最后一个节点元素状态不是CONDITION,则说明该元素已经被移入sync队列,则清空该条件队列
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //新建一个CONDITION状态的节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            //队列为空时,条件队列的头结点,指向该节点
            firstWaiter = node;
        else
            //如果条件队列不为空,则把尾结点的下个元素指向该节点
            t.nextWaiter = node;
        //把尾节点指向该节点
        lastWaiter = node;
        return node;
    }
        
    final boolean isOnSyncQueue(Node node) {
        //当前节点状态为CONDITION,或者上个节点为空,则说明不在sync队列上
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //如果节点sync队列的下个节点不为空,则说明,该节点在sync队列上
        if (node.next != null) // If has successor, it must be on queue
            return true;
        //遍历sync队列,看是否能在sync队列上找到该节点,找到则返回true,否则返回false
        return findNodeFromTail(node);
    }
    
    private int checkInterruptWhileWaiting(Node node) {
        //线程没有被打断时,调用transferAfterCancelledWait,修改状态,并且把节点都移入到sync队列中
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    final boolean transferAfterCancelledWait(Node node) {
        //CAS去修改节点状态,从CONDITION,修改成初始状态
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //从ReentrantLock中,我们知道这个是入队(sync队列)的操作
            enq(node);
            return true;
        }
        //该循环是为了防止同时有signal方法执行,上面抢占锁失败,必须等待该节点已经被移入到sync队列后才能跳出该方法
        while (!isOnSyncQueue(node))
            //线程让出CPU,等待下次执行
            Thread.yield();
        return false;
    }
    
    //从该方法看出,就是取消各种引用,就是当条件队列被移入sync队列后,所有的引用都被清空,好便于垃圾回收
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

条件队列等待方法就是这些,我们继续接着来看入队方法enqueue;

private void enqueue(E x) {
    final Object[] items = this.items;
    //把元素放入索引为‘放指针’的位置
    items[putIndex] = x;
    //放索引加一,当如果,指针已经到了数组尾部,则指针又回到数组头部,就是重置为0
    if (++putIndex == items.length)
        putIndex = 0;
    //队列元素长度加一
    count++;
    //当元素已经放入,则通知非空条件队列,唤醒该队列线程可以
    notEmpty.signal();
}

我们继续来看下signal通知方法

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //获取到条件队列的第一个节点,开始唤醒
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        //遍历整个条件队列,修改状态,然后入sync队列,直到没有下一个节点为止
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    //使用cas修改waitStatus状态,从CONDITION,设置成初始状态,这个地方可能会和await方法发生竞争,但是一般不会
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //修改后,就把该节点移入到sync队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    //该判断是在该线程被中断的情况下,修改后,会唤醒下个节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

put方法的整个流程我们就分析完了,我们再来分析下,take方法的dequeue,出队方法

private E dequeue() {
    final Object[] items = this.items;
    //获取数组下标为【取数据指针】  位置的数据
    E x = (E) items[takeIndex];
    //再把当前位置置位空,则出队完
    items[takeIndex] = null;
    //如果当前指针位置已经到了,数组尾,则需要跳到数组头,就相当一个圆
    if (++takeIndex == items.length)
        takeIndex = 0;
    //取出值后,再把队列长度减一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //当元素出队后,通知未满条件队列,唤醒线程,signal方法已经分析过了,这里可以看下上面的逻辑,联系起来
    notFull.signal();
    return x;
}

四、总结

该队列使用的是数组来存放数据,如果当数据放满后,使用put方法则,阻塞住该线程,把该线程移入到未满条件队列中,等待取数据后被唤醒;如果取数据时,队列为空,则把该线程放入非空条件队列,等待放数据后被唤醒;因为使用的是数组,用写指针和读指针来标记位置,如果到了队尾,则会重新跳到队头;相当于是一个环;

相关文章

网友评论

    本文标题:ArrayBlockingQueue源码分析

    本文链接:https://www.haomeiwen.com/subject/jxoieltx.html