美文网首页
并发容器BlockingQueue - PriorityBloc

并发容器BlockingQueue - PriorityBloc

作者: 王侦 | 来源:发表于2019-07-13 19:17 被阅读0次

    1.官方文档

    An unbounded blocking queue that uses the same ordering rules as 
    class PriorityQueue and supplies blocking retrieval operations. 
    While this queue is logically unbounded, attempted additions may 
    fail due to resource exhaustion (causing OutOfMemoryError). This 
    class does not permit null elements. A priority queue relying on 
    natural ordering also does not permit insertion of non-comparable 
    objects (doing so results in ClassCastException).
    
    This class and its iterator implement all of the optional methods of 
    the Collection and Iterator interfaces. The Iterator provided in 
    method iterator() is not guaranteed to traverse the elements of the 
    PriorityBlockingQueue in any particular order. If you need ordered 
    traversal, consider using Arrays.sort(pq.toArray()). Also, method 
    drainTo can be used to remove some or all elements in priority 
    order and place them in another collection.
    
    Operations on this class make no guarantees about the ordering of 
    elements with equal priority. If you need to enforce an ordering, you
     can define custom classes or comparators that use a secondary 
    key to break ties in primary priority values. For example, here is a 
    class that applies first-in-first-out tie-breaking to comparable 
    elements. To use it, you would insert a new FIFOEntry(anEntry) 
    instead of a plain entry object.
    

    无界阻塞优先队列,不允许null元素。

    iterator()不保证遍历顺序。

    2.返回特殊值offer和poll

       public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    3.阻塞put和take

    无界队列,put无需阻塞

        public void put(E e) {
            offer(e); // never need to block
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
                lock.unlock();
            }
            return result;
        }
    

    4.乐观锁方式扩容

        private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                         0, 1)) {
                try {
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : // grow faster if small
                                           (oldCap >> 1));
                    if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;
                    }
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                    allocationSpinLock = 0;
                }
            }
            if (newArray == null) // back off if another thread is allocating
                Thread.yield();
            lock.lock();
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    

    先释放lock,再使用allocationSpinLock简单的乐观锁,使得在分配新数组内存空间过程中,其他线程线程还可以使用数组,提高了并发性。

    如果有其他线程在扩容,则back off,直接调用yield()。spinlock锁使用CAS控制只有一个线程可以进行扩容,CAS失败的线程会调用Thread.yield() 让出 cpu,目的是为了让扩容线程扩容后优先调用 lock.lock 重新获取锁,

    相关文章

      网友评论

          本文标题:并发容器BlockingQueue - PriorityBloc

          本文链接:https://www.haomeiwen.com/subject/ibkmkctx.html