ArrayBlockingQueue源码分析

作者: 蓝梅 | 来源:发表于2021-06-13 00:12 被阅读0次

    今天看一看BlockingQueue阻塞队列,怎么来实现阻塞的;

    一、阻塞队列体系图

    image.png

    二、主要属性

    //元素数组
    final Object[] items;
     // 取元素的指针
    int takeIndex;
    // 放元素的指针
    int putIndex; 
    // 元素数量
    int count; 
    //集合锁
    final ReentrantLock lock;
    //非空条件等待队列
    private final Condition notEmpty;
    //未满条件等待队列
    private final Condition notFull;
    

    从上面主要属性我们大概可以看出,入队元素是放入数组中的,然后由取元素的指针指向队列头元素,放元素的指针指向队列尾元素;由锁来保证取放数据时的并发安全

    三、源码分析

    我们来跟着源码分析一遍,主要是看注释

    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            //初始化数组,容量为capacity
            this.items = new Object[capacity];
            //初始化锁,true为公平锁,false为非公平锁
            lock = new ReentrantLock(fair);
            //初始化两个条件队列
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    构造方法先初始化了一些主要元素,我们再看看put方法

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //入队时,先加锁,如果被打断会抛异常出来
            lock.lockInterruptibly();
            try {
                /*
                 *当队列已经放满时,则把当前线程放入,未满条件等待队列,挂起线程
                 *当线程被唤醒时,则再去看队列是否放满
                 *未放满,则元素入队,放满则再继续挂起
                 */
                while (count == items.length)
                    notFull.await();
                //当队列未满时,则把元素入队,
                enqueue(e);
            } finally {
                //添加完成释放锁
                lock.unlock();
            }
        }
    

    我们再看看take方法

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            //出队时,先加锁
            lock.lockInterruptibly();
            try {
                /*
                 *当队列为空时,则把该线程放入非空条件等待队列,挂起该线程
                 *当线程被唤醒时,则再去看队列是否非空
                 *有值,则出队,没有值,则再次挂起该线程
                 */
                while (count == 0)
                    notEmpty.await();
                //出队
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    到此,则主要逻辑分析完了,然后我们继续来看看,条件队列是怎么来挂起该线程的,本来应该是在上一节ReentrantLock源码分析中去讲该逻辑的,但是上次主要是讲ReentrantLock主流程,这次遇到了条件锁,那就正好来分析一遍;我们先来看看lock.newCondition() new出来的对象ConditionObject主要元素;

    //条件队列的头节点
    private transient Node firstWaiter;
    //条件队列的尾节点
    private transient Node lastWaiter;
    

    再来看看调用的await()方法

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //创建该条件队列的节点
            Node node = addConditionWaiter();
            //该方法很明显就是ReentrantLock释放锁的逻辑,这里就不分析了
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断该节点是否在sync队列中,不在则挂起该线程
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //唤醒后,会去修改waitStatus属性,然后把条件队列的节点,移入sync队列
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            /*当该节点被唤醒时,这个可以看下后面signal方法后再回到这里
             *因为唤醒后,该节点会被移入sync队列,所以才会跳出上面的循环
             *acquireQueued这个方法正好也是ReentrantLock抢占锁资源的方法,这里就不分析了
             */
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                //断开所有条件队列的指针,清空条件队列
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            //如果最后一个节点元素状态不是CONDITION,则说明该元素已经被移入sync队列,则清空该条件队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //新建一个CONDITION状态的节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                //队列为空时,条件队列的头结点,指向该节点
                firstWaiter = node;
            else
                //如果条件队列不为空,则把尾结点的下个元素指向该节点
                t.nextWaiter = node;
            //把尾节点指向该节点
            lastWaiter = node;
            return node;
        }
            
        final boolean isOnSyncQueue(Node node) {
            //当前节点状态为CONDITION,或者上个节点为空,则说明不在sync队列上
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            //如果节点sync队列的下个节点不为空,则说明,该节点在sync队列上
            if (node.next != null) // If has successor, it must be on queue
                return true;
            //遍历sync队列,看是否能在sync队列上找到该节点,找到则返回true,否则返回false
            return findNodeFromTail(node);
        }
        
        private int checkInterruptWhileWaiting(Node node) {
            //线程没有被打断时,调用transferAfterCancelledWait,修改状态,并且把节点都移入到sync队列中
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
        
        final boolean transferAfterCancelledWait(Node node) {
            //CAS去修改节点状态,从CONDITION,修改成初始状态
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //从ReentrantLock中,我们知道这个是入队(sync队列)的操作
                enq(node);
                return true;
            }
            //该循环是为了防止同时有signal方法执行,上面抢占锁失败,必须等待该节点已经被移入到sync队列后才能跳出该方法
            while (!isOnSyncQueue(node))
                //线程让出CPU,等待下次执行
                Thread.yield();
            return false;
        }
        
        //从该方法看出,就是取消各种引用,就是当条件队列被移入sync队列后,所有的引用都被清空,好便于垃圾回收
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
    

    条件队列等待方法就是这些,我们继续接着来看入队方法enqueue;

    private void enqueue(E x) {
        final Object[] items = this.items;
        //把元素放入索引为‘放指针’的位置
        items[putIndex] = x;
        //放索引加一,当如果,指针已经到了数组尾部,则指针又回到数组头部,就是重置为0
        if (++putIndex == items.length)
            putIndex = 0;
        //队列元素长度加一
        count++;
        //当元素已经放入,则通知非空条件队列,唤醒该队列线程可以
        notEmpty.signal();
    }
    

    我们继续来看下signal通知方法

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //获取到条件队列的第一个节点,开始唤醒
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        do {
            //遍历整个条件队列,修改状态,然后入sync队列,直到没有下一个节点为止
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        //使用cas修改waitStatus状态,从CONDITION,设置成初始状态,这个地方可能会和await方法发生竞争,但是一般不会
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //修改后,就把该节点移入到sync队列中
        Node p = enq(node);
        int ws = p.waitStatus;
        //该判断是在该线程被中断的情况下,修改后,会唤醒下个节点
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    put方法的整个流程我们就分析完了,我们再来分析下,take方法的dequeue,出队方法

    private E dequeue() {
        final Object[] items = this.items;
        //获取数组下标为【取数据指针】  位置的数据
        E x = (E) items[takeIndex];
        //再把当前位置置位空,则出队完
        items[takeIndex] = null;
        //如果当前指针位置已经到了,数组尾,则需要跳到数组头,就相当一个圆
        if (++takeIndex == items.length)
            takeIndex = 0;
        //取出值后,再把队列长度减一
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //当元素出队后,通知未满条件队列,唤醒线程,signal方法已经分析过了,这里可以看下上面的逻辑,联系起来
        notFull.signal();
        return x;
    }
    

    四、总结

    该队列使用的是数组来存放数据,如果当数据放满后,使用put方法则,阻塞住该线程,把该线程移入到未满条件队列中,等待取数据后被唤醒;如果取数据时,队列为空,则把该线程放入非空条件队列,等待放数据后被唤醒;因为使用的是数组,用写指针和读指针来标记位置,如果到了队尾,则会重新跳到队头;相当于是一个环;

    相关文章

      网友评论

        本文标题:ArrayBlockingQueue源码分析

        本文链接:https://www.haomeiwen.com/subject/jxoieltx.html