美文网首页
多线程并发编程14-PriorityBlockingQueue源

多线程并发编程14-PriorityBlockingQueue源

作者: Demo_zfs | 来源:发表于2020-03-18 22:47 被阅读0次

        PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。其内部使用平衡二叉树堆实现的,所以遍历队列元素不能保证有序性。默认使用对象的compareTo方法进行比较,也可以自定义comparators。

        PriorityBlockingQueue内部有一个数组用来存放队列元素,在前面介绍的ArrayBlockingQueue类中也有一个数组存放队列元素,为什么ArrayBlockingQueue是有界队列而PriorityBlockingQueue是无界队列呢?因为在PriorityBlockingQueue内部会对存放队列元素的数据进行扩容,扩容要保证只能一个线程进行,所以PriorityBlockingQueue内部有一个自旋锁 allocationSpinLock,其使用CAS操作来保证只有一个线程可以扩容。   PriorityBlockingQueue类中还有一个ReentrantLock对象锁,队列的读写操作需要获取该对象。由于是无界队列生成元素并不受限制,但是队列为空时消费数据会被限制(阻塞),所以PriorityBlockingQueue内部只有一个条件变量来实现消费模式。

        PriorityBlockingQueue内部主要的成员变量:

    private transient Object[]queue;

    private transient volatile int allocationSpinLock;

    private final ReentrantLocklock;

    private final ConditionnotEmpty;

        下面对主要函数原理进行讲解。

    offer(E e)

        offer方法向队列中插入一个元素,由于是无界队列,所以插入操作总是返回true。

    public boolean offer(E e) {

        if (e == null)

            throw new NullPointerException();

    //(1)尝试获取独占锁对象。

        final ReentrantLock lock = this.lock;

        lock.lock();

        int n, cap;

        Object[] array;

    //(2)判断队列是否需要进行扩容。

        while ((n = size) >= (cap = (array = queue).length))

            tryGrow(array, cap);

        try {

            Comparator<? super E> cmp = comparator;

    //(3)使用默认对比器或自定义对比器进行建二叉树堆

            if (cmp == null)

                siftUpComparable(n, e, array);

            else

                siftUpUsingComparator(n, e, array, cmp);

            size = n + 1;

    //(4)通知因为队列为空而阻塞的消费者可以进行获取数据。

            notEmpty.signal();

        } finally {

    //(5)释放锁。

            lock.unlock();

        }

        return true;

    }

    tryGrow(Object[] array, int oldCap)

        对队列进行扩容,使用自旋锁和CAS算法保证只有一个线程能进行扩容。

    private void tryGrow(Object[] array, int oldCap) {

    //(1)释放获取的独占锁,在进行扩容的过程让别的线程也可以获取到该锁。

        lock.unlock(); 

        Object[] newArray = null;

    //(2)CAS成功则进行扩容。

        if (allocationSpinLock == 0 &&

            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

                                    0, 1)) {

            try {

    //(3)进行扩容,oldGap<64则增加oldCap+2,否则增加oldCap的一半,并且容量最大值为MAX_ARRAY_SIZE。从这来看虽然队列会进行扩容,但也不是无限扩容,严格来说也应该算是有界的。

                int newCap = oldCap + ((oldCap < 64) ?

                                      (oldCap + 2) : // grow faster if small

                                      (oldCap >> 1));

                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow

                    int minCap = oldCap + 1;

                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

                        throw new OutOfMemoryError();

                    newCap = MAX_ARRAY_SIZE;

                }

                if (newCap > oldCap && queue == array)

                    newArray = new Object[newCap];

            } finally {

        //(4)扩完容之后将自旋锁allocationSpinLock  设置为0,允许下次进行扩容。

                allocationSpinLock = 0;

            }

        }
    //(5)第一个线程CAS成功后,第二个线程会进入这段代码,然后第二个线程会让出cpu,尽量让第一个线程获取锁,但这不保证一定可以。

        if (newArray == null) // back off if another thread is allocating

            Thread.yield();

    //(6)获取锁,将原来队列中的元素拷贝到扩容后的队列中。

        lock.lock();

        if (newArray != null && queue == array) {

            queue = newArray;

            System.arraycopy(array, 0, newArray, 0, oldCap);

        }

    }

    poll()

        获取队列内部堆树的根节点元素,如果队列为空则返回null。

    public E poll() {

    //(1)尝试获取锁。

        final ReentrantLock lock = this.lock;

        lock.lock();

        try {

    //(2)获取队列的第一个元素,并整理二叉树堆。

            return dequeue();

        } finally {

    //(3)释放锁。

            lock.unlock();

        }

    }

    put()

        由于是无界队列,不需要阻塞,put方法内部调用的offer方法,这就不进行赘述了。

    public void put(E e) {

        offer(e); // never need to block

    }

    take()

        获取队列内部堆树的根节点元素,如果队列为空则阻塞。

    public E take() throws InterruptedException {

    //(1)尝试获取锁对象,调用的是lockInterruptibly方法,所以在当其他线程设置了中断标志,该线程会抛出InterruptedException异常。

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();

        E result;

        try {

    //(2)如果队列为空,则阻塞,停止消费。

            while ( (result = dequeue()) == null)

                notEmpty.await();

        } finally {

    //(3)释放锁。

            lock.unlock();

        }

        return result;

    }

    size()

        size方法需要获取锁,因为本类中的size变量没有volatile变量修饰无法保证内存的可见性。

    public int size() {

        final ReentrantLock lock = this.lock;

        lock.lock();

        try {

            return size;

        } finally {

            lock.unlock();

        }

    }

        PriorityBlockingQueue队列在内部使用二叉树堆维护元素的优先级,使用数组作为元素的存储的数据结构,该数组可进行扩容,但是容量也是有限制的,使用CAS来保证扩容时的唯一性。

         今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

    相关文章

      网友评论

          本文标题:多线程并发编程14-PriorityBlockingQueue源

          本文链接:https://www.haomeiwen.com/subject/tlhhyhtx.html