美文网首页程序员
Java 优先队列 PriorityQueue Priority

Java 优先队列 PriorityQueue Priority

作者: 被称为L的男人 | 来源:发表于2019-01-04 10:55 被阅读90次

    基本使用

    @Test
    public void testPriorityQueue() throws InterruptedException {
        PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
        System.out.println(priorityQueue);
        System.out.println(priorityQueue.poll());
        System.out.println(priorityQueue.poll());
    
        PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
        blockingQueue.add(5);
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.take());
    }
    

    输出

    [1, 3, 2, 4, 5]
    1
    2
    5
    (阻塞)
    

    PriorityQueue

    成员变量

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    transient Object[] queue; // non-private to simplify nested class access
    
    /**
     * The number of elements in the priority queue.
     */
    private int size = 0;
    
    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private final Comparator<? super E> comparator;
    
    /**
     * The number of times this priority queue has been
     * <i>structurally modified</i>.  See AbstractList for gory details.
     */
    transient int modCount = 0; // non-private to simplify nested class access
    

    通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。

    基本方法

    public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }
    
    /**
     * Inserts item x at position k, maintaining heap invariant by
     * demoting x down the tree repeatedly until it is less than or
     * equal to its children or is a leaf.
     *
     * @param k the position to fill
     * @param x the item to insert
     */
    private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }
    
    @SuppressWarnings("unchecked")
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = key;
    }
    

    以poll方法为例,实际上是获取堆顶元素,然后调整堆。

    调整堆的方法(以大顶堆为例):

    1. 判断是否传入comparator,有则按照comparator排序,否则按照自然顺序排序
    2. 取节点左右孩子节点最大值,与父亲节点交换

    扩容方法

    /**
     * Increases the capacity of the array.
     *
     * @param minCapacity the desired minimum capacity
     */
    private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }
    
    private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError();
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }
    
    1. 小容量扩容1倍
    2. 大容量扩容0.5倍
    3. 快溢出时调整为Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE

    是否线程安全

    非线程安全

    PriorityBlockingQueue

    其实现基本与PriorityQueue一致,不过PriorityBlockingQueue是线程安全的,并且实现了BlockingQueue接口,在队列为空时take会阻塞。

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;
    
    /**
     * The number of elements in the priority queue.
     */
    private transient int size;
    
    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;
    
    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;
    
    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;
    
    /**
     * Spinlock for allocation, acquired via CAS.
     */
    private transient volatile int allocationSpinLock;
    
    /**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */
    private PriorityQueue<E> q;
    

    和PriorityQueue的区别:增加了

    1. 重入锁ReentrantLock
    2. Condition,用于队列空情况下的阻塞
    3. allocationSpinLock,通过CAS手段对queue扩容
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
    

    可以看到与PriorityQueue的扩容函数很像,不同点:

    1. 调用函数时必须持有锁
    2. 使用CAS方法进行扩容,在allocationSpinLock为0,并且CAS将其置为1时,线程才能够对数组进行扩容。如果多个线程并发扩容,其余线程会调用Thread.yield()方法。

    为什么这样实现PriorityBlockingQueue扩容?

    因为PriorityBlockingQueue内部使用的ReentrantLock重入锁,同一个线程多次调用add函数,可能恰好同时调用了tryGrow函数。此时通过重入锁是无法加锁的,仅能通过Synchronized或CAS方式控制并发。

    allocationSpinLock是transient的,因为序列化时并不需要此参数;同时又是volatile的,因为可能有多个线程同时调用。

    private transient volatile int allocationSpinLock;
    

    UNSAFE.compareAndSwapInt

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long allocationSpinLockOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = PriorityBlockingQueue.class;
            allocationSpinLockOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("allocationSpinLock"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    

    调用方法

    UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)
    

    allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。

    那么使用allocationSpinLockOffset有什么好处呢?它和直接修改allocationSpinLock变量有什么区别?

    获取该字段在类中的内存偏移量,直接将内存中的值改为新值。直接修改allocationSpinLock并不是CAS。JDK 1.8代码如下:

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
        return var5;
    }
    

    在AtomicInteger类中的调用如下,getAndAddInt方法由具体类的实现方法,抽取到了UNSAFE类中:

    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
    

    对比 PriorityQueue 和 PriorityBlockingQueue

    1. PriorityQueue是非线程安全的,PriorityBlockingQueue是线程安全的
    2. PriorityBlockingQueue使用重入锁,每一个操作都需要加锁
    3. PriorityBlockingQueue扩容时使用了CAS操作
    4. 两者都使用了堆,算法原理相同
    5. PriorityBlockingQueue可以在queue为空时阻塞take操作

    JDK实现堆的方法

    /**
     * Establishes the heap invariant (described above) in the entire tree,
     * assuming nothing about the order of the elements prior to the call.
     */
    @SuppressWarnings("unchecked")
    private void heapify() {
        for (int i = (size >>> 1) - 1; i >= 0; i--)
            siftDown(i, (E) queue[i]);
    }
    
    private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }
    
    @SuppressWarnings("unchecked")
    private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = key;
    }
    
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
    
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }
    
    @SuppressWarnings("unchecked")
    private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
    

    相关文章

      网友评论

        本文标题:Java 优先队列 PriorityQueue Priority

        本文链接:https://www.haomeiwen.com/subject/bsrhrqtx.html