美文网首页
juc5-PriorityBlockingQueue与Delay

juc5-PriorityBlockingQueue与Delay

作者: modou1618 | 来源:发表于2019-02-03 14:18 被阅读0次

一 PriorityBlockingQueue

  • 与PriorityQueue相比就是访问时多了锁的并发控制

1.1 扩容

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // 先释放锁
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                    0, 1)) {//扩容时allocationSpinLock加锁
        try {//小于64时翻倍,否则增减50%
            int newCap = oldCap + ((oldCap < 64) ?
                    (oldCap + 2) : // grow faster if small
                    (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // 其他线程在执行扩容
        Thread.yield();
    lock.lock();//加锁后包括原数据到新存储数组中
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

1.2 入队

  • 与LinkedBlockingQueue和ArrayBlockingQueue不同,可扩容,所以不会用空间不足需要等待的情况
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
//尝试扩容,其他线程在扩容则想让出cpu,等会再检查容量是否足够
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)//使用原生比较函数比较
            siftUpComparable(n, e, array);
        else//使用配置的比较函数比较大小
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {//从索引k开始,逐层向上比较大小,
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

1.3 出队

  • 存在队列为空时,需要等待的情况。入队不需要等待。所以只有一个条件对象notEmpty
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];//出队索引0节点
        E x = (E) array[n];//末尾索引节点放到索引0
        array[n] = null;
        Comparator<? super E> cmp = comparator;
//调整最小堆,使索引0节点到正确位置
        if (cmp == null)//使用默认比较函数
            siftDownComparable(0, x, array, n);
        else//使用用户自定义比较函数
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {//从索引k开始逐层向下和孩子节点比较,
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)//左右孩子比较,选出较小的
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)//较小孩子和目标节点值比较
                break;//比孩子小,则赋值给当前节点,
            array[k] = c;//否则较小孩子赋值当前节点,继续向下比较
            k = child;
        }
        array[k] = key;
    }
}

二 DelayQueue

  • private final PriorityQueue<E> q = new PriorityQueue<E>();优先级队列存储数据

2.1 入队

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);//入队
        if (q.peek() == e) {//本次入队的是优先级最高的
            leader = null;//新入队可能优先级更高,设置lead为null
            available.signal();//通知等待线程重新检查获取有数据可用
        }
        return true;
    } finally {
        lock.unlock();
    }
}

2.2 出队

  • poll()返回null或可用元素
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();//获取优先级最高元素
        if (first == null || first.getDelay(NANOSECONDS) > 0)//空队列,或元素的延迟时间大于0,则不返回
            return null;
        else
            return q.poll();//放回优先级最高元素
    } finally {
        lock.unlock();
    }
}
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)//无数据,则休眠等待
                available.await();
            else {//有数据,检查延迟时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)//时间到期,则数据出队返回
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)//leader不为null,有线程在等待该节点则休眠等待
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {//休眠优先级最高节点的延迟时间,配合leader为优先级最高节点被等待的线程。
                        available.awaitNanos(delay);
                    } finally {//休眠完成,进入循环,节点出队,返回数据
                        if (leader == thisThread)
                            leader = null;//设置lead为null
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();//还有数据通知其他线程唤醒处理
        lock.unlock();
    }
}

三 差别

区别 PriorityBlockingQueue DelayQueue
存储 数组存储,内部实现最小堆功能 PriorityQueue存储,使用类接口
扩容 支持 支持
并发锁 支持 支持
优先级 队列中位置 队列中位置,同时表示节点数据需要等待多久之后才可被访问
条件对象 取数据,队列为空是等待 取数据队列为空时等待,有数据但延迟时间未到时超时等待。

相关文章

网友评论

      本文标题:juc5-PriorityBlockingQueue与Delay

      本文链接:https://www.haomeiwen.com/subject/gjmysqtx.html