美文网首页
PriorityBlockingQueue优先级阻塞队列

PriorityBlockingQueue优先级阻塞队列

作者: RealityVibe | 来源:发表于2021-01-26 15:36 被阅读0次

    PriorityBlockingQueue

    1、带优先级的无界阻塞队列,每次出队列都返回优先级最高或者最低的元素

    2、内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不保证有序。

    3、默认使用对象的compareTo方法比较,支持自定义comparators

    类图结构

    PrioripyBlockingQueue类图

    ​ PriorityBlockingQueue使用数组用来存储队列中的元素,元素在队列中根据坐标index呈现平衡二叉树的分布,以第i个元素为例,它的左孩子为2i+1,右孩子为2i+2,在类中常用位运算计算某个节点x的父节点(公式为: (x-1)>>1)。

    元素存储关系

    ​ 在实际情况中,由于内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不保证有序。如下图情况,在数组中元素并非严格按序存放,需要按右侧逻辑结构理解最小堆的思想。

    遍历数组不保证有序

    源码分析

    PriorityBlockingQueue

    添加元素

    ​ 由上图可知添加元素有add、put和offer方法。

    public void put(E e) {
        offer(e); // never need to block
    }
    public boolean add(E e) {
            return offer(e);
    }
    
    public boolean offer(E e) {
        // (1) 判断元素是否为null,null抛出异常
        if (e == null)
            throw new NullPointerException();
        // (2) 获取独占锁,注意一个队列共用一个独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // (3)如果容量已经达到峰值,需要扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
      
        try {
            Comparator<? super E> cmp = comparator;
            // (4)根据是否有自定义比较器选择方法
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // (5)添加元素成功,唤醒notEmpty等待的消费线程
            notEmpty.signal();
        } finally {
            // (6) 释放独占锁
            lock.unlock();
        }
        return true;
    }
    

    ​ 而从源码易知put、add方法内部调用offer实现。所以添加元素的源码重点在于offer方法。从图中容易发现,与ArrayBlockingQueue不同,虽然同样使用数组作为存储结构,但是PriorityBlockingQueue永远从“结尾“添加元素(但不一定放在结尾),从前面出队列(出队列必取第一个元素)。

    tryGrow(Object[] array, int oldCap)

    private void tryGrow(Object[] array, int oldCap) {
        // (7)释放独占锁
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // (8)申请扩容锁 CAS乐观锁
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // (9)扩容容量算法
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // (10)
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
       // (11)
    }
    

    ​ 我们发现一进到扩容方法后,(7)处就调用了lock.unlock()释放独占锁,这样的好处是,在进行扩容时,其它线程可以获取到独占锁进行入队列、出队列的操作(因为出队列后,实际容量已经小于最大值,所以存在入队列的可能,但不妨碍之前扩容的线程继续扩容)。由于采取了CAS乐观锁的机制,也可以保证某时刻只有一个线程在进行扩容操作。而其它线程由于竞争失败,不断的自旋(自旋路线为(8)->(10)->(11)->(3)->(8))。

    ​ 在(10)处,竞争扩容锁失败的线程,会调用Thread.yield();尝试让出CPU时间片,不一定有效。

    siftUpComparable(int k, T x, Object[] array)

    源码注释:该方法用于将元素x尝试插入到队列k中,如果发现当前位置的父节点值大于元素x,则将元素x往上层提,直到x大于等于它的父节点,或者成为root根节点。

    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        // (0) key为目标元素的对应比较器
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            //(1)获取父节点位置和父节点元素的值
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            // (2)如果x的值更大,说明找到了合适的位置
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    

    ​ 用一个容易理解的例子来尝试理解这段代码,当一个年轻人来到公司,一开始处于组织架构树的末端,但是这个年轻人不讲武德,就找到所属小组的老领导偷袭一下,偷袭输了(key.compareTo((T) e) >= 0),老老实实呆在原位。如果偷袭成功(key.compareTo((T) e) < 0),让这个老领导担任年轻人原来的位置 array[k] = e;,然后新人取缔位置k = parent;继续偷袭,偷袭它的老领导的领导(k = parent, int parent = (k - 1) >>> 1,不断往上找父节点),以此类推,直到这个年轻人偷袭成功所有人,成为了大BOSS(root节点),或者被某个领导防御成功了(key.compareTo((T) e) >= 0),就在偷袭到的最高位置坐下。

    siftUpUsingComparator

    siftUpUsingComparator方法相比siftUpComparable只是比较器采用了自定义比较器,其它没有区别

    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
    

    移除元素

    同样的从前面的图中我们可以看到移除元素的方法为poll、peek(不会移除)、take方法。

    peek

    由前文可知,PriorityBlockingQueue由一个独占锁维护,在peek调用之前会先获取独占锁。确认队列中是否有元素,有则返回index为0的元素,否则返回null。peek方法不会移除元素,只做查询

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (size == 0) ? null : (E) queue[0];
        } finally {
            lock.unlock();
        }
    }
    

    poll

    public E poll() {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
        return dequeue();
      } finally {
        lock.unlock();
      }
    }
    
    • dequeue
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
    
    • siftDownComparable

    核心思想是从根节点开始总左右孩子中找到一个最小的节点,成为新的根节点,

    Object c = array[child]; // 左孩子

    int right = child + 1; // 右孩子索引

    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            // 如果k>=half已经超出到x原有的位置,没有必要
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child]; // 左孩子,c默认为左孩子节点
                int right = child + 1; // 右孩子索引
                // 右孩子索引在有效范围内,即是否存在 && 右孩子更小
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    // c = 右孩子节点
                    c = array[child = right];
                // 如果x小于等于于左(右)孩子,x成为根节点,这样变动最小
                // 如果x大于左(右)孩子,让左(右)孩子成为根节点,x取代他们原来的位置
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
    

    take

    take方法获取独占锁支持中断,

    如果队列中没有元素,等待唤醒。 while循环避免虚假唤醒

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            // 队列中没有元素,等待唤醒。 while循环避免虚假唤醒
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    

    参考

    《Java并发编程实战》

    相关文章

      网友评论

          本文标题:PriorityBlockingQueue优先级阻塞队列

          本文链接:https://www.haomeiwen.com/subject/cvigzktx.html