美文网首页
2.BlockingQueue综合分析

2.BlockingQueue综合分析

作者: 致虑 | 来源:发表于2020-09-11 16:39 被阅读0次

BlockingQueue

BlockingQueue是一个线程安全的阻塞队列,一般是FIFO(先进先出),是各种具体队列实现的接口,适用于Java编程中生产者-消费者场景,可以构造有界队列及无界队列。提供了添加元素的接口方法add(), put(), offer(),获取元素方法take(), poll() ,具体的队列实现,我们看下文。

BlockingQueue最合适的使用场景就是为线程间执行提供同步手段,作为线程执行数据的存放容器。

其实这里的无界队列容量也会默认填充一个最大值Integer.MAX_VALUE,下文案例中会有代码体现

1、ArrayBlockingQueue

ArrayBlockingQueue内部是利用数组实现的线程安全的有界阻塞队列,FIFO(先进先出)模式添加删除元素。内部通过ReentrantLock两个Condition(notEmpty、notFull)实现线程安全。但这里ReentrantLock有公平与非公平之分,保证等待的线程获取锁的一个顺序是否是竞争还是按序进行,这里很好理解。至于ReentrantLock在锁一章节中会详细介绍。

两个Condition(notEmpty、notFull)分别维护等待take(获取元素)和等待put(插入元素)的线程队列,内部也是单链表实现结构,此内容细节也会在Condition一节会有详细介绍,此处做个简单的了解。

两个Condition之间相互唤醒:

final ReentrantLock lock;          // 全局锁
private final Condition notEmpty;  // 等待获取元素的线程队列
private final Condition notFull;   // 等待插入元素的线程队列
  • 队列为空时,notEmpty等待,一旦插入元素了,唤醒notEmpty中线程
  • 队列满时,notFull等待,一旦取出元素了,唤醒notFull中线程
ArrayBlockingQueue

图中takeIndex与putIndex分别向后移动,保证先进先出。当putIndex移动到index=Array.length长度时,重新指向index=0,因为通过count能够知道前面已经被消费了,可以重新插入数据了。这个时候,takeIndex指向的并不是index=0的位置,因此也需要等到takeIndex重新回指导index=0的时候才能被消费,保证了FIFO。

现在分别看两段底层插入代码片段就一目了然了(其他的出队与入队操作最终都是调用的这里,当然peek除外)。

        /** 入队操作,此处会进行加锁及等待以保证线程安全 */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 若队列已满,阻塞等待
            while (count == items.length)
                notFull.await();
          
            // 调用最终的入队操作
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

        /** 入队操作,此处会进行加锁及等待以保证线程安全 */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 若队列为空,阻塞等待
            while (count == 0)
                notEmpty.await();
          
            // 调用最终的出队操作
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

        /** 入队操作,私有方法 */
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x; // 将元素插入对应的putIndex位置
      
        // 如果此时下一个putIndex==队列长度,则重新回指导队列首
        // 此处如何保证队列首元素一定被出队了呢,结合分析上面出发方法put里有一个 while (count == items.length)的逻辑判断保证了这一点
        if (++putIndex == items.length) 
            putIndex = 0;
        count++;
        notEmpty.signal(); // 唤醒notEmpty队列中的线程
    }

    /** 出队操作,私有方法 */
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        
      // 如果此时下一个putIndex==队列长度,则重新回指导队列首,此处也就是先消费了整个队列的元素才能回指,因此也保证了FIFO
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal(); // 唤醒notEmpty队列中的线程
        return x;
    }

因为继承了Collection,所以ArrayBlockingQueue也实现了自己的迭代器Itrs

2、PriorityBlockingQueue

PriorityBlockingQueue是一个利用数组(二叉堆)实现的线程安全的无界优先级阻塞队列,这里有两个个核心的概念:二叉堆、优先级

不管三七二十一,先看一段代码执行的结果

@Test
public void testOne(){
    PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue(50, new Comparator<Integer>() {
        @Override
        public int compare(Integer o1, Integer o2) {
            return o1-o2;
        }
    });
    queue.add(50);
    queue.add(10);
    queue.add(3);
    queue.add(7);
    queue.add(50);
    queue.add(49);
    queue.add(23);
    queue.add(12);
    queue.add(19);
    queue.add(70);
    queue.add(199);
    queue.add(19);
    for(Object i : queue.toArray()){
    System.out.print(i + ", ");
    }
}

# 输出:
3, 7, 10, 12, 50, 19, 23, 50, 19, 70, 199, 49

从以上测试及结果分析:

构造队列时指定了容量为50,优先比较规则:o1与o2大小;初步看上去输入无序,输出也无序

其实这里输出正是满足了二叉堆特性。那下面先大致介绍一下二叉堆,这有利于后面分析PriorityBlockingQueue

二叉堆

二叉堆本质是完全二叉树的一种,只不过它保持着“父节点要么大于子节点 或者 父节点要么小于子节点”的特性,对于二叉堆任何层级的节点都满足该特性,即全局如此。

那么依然看下上面的输出结果:

3, 7, 10, 12, 50, 19, 23, 50, 19, 70, 199, 49

该结构在PriorityBlockingQueue中就是一个数组,用数组描叙就是:

queue[0] = 3, queue[1] = 7, queue[2] = 10, queue[3] = 12, queue[4] = 50, queue[5] = 19, queue[6] = 23, queue[7] = 50, queue[8] = 19, queue[9] = 70, queue[10] = 199, queue[11] = 49

那么跟二叉堆有啥关系呢,其实PriorityBlockingQueue就是利用数组存储的二叉堆,我们继续下面操作

将其直接构造成二叉堆结构就如下(傻瓜式平铺就好了):

PriorityBlockingQueue-二叉堆

这里就能很自然的看出来该二叉树满足“父节点要么小于子节点”特性,(当然如果将上叙程序改变下比较规则:O2-O1,那么构造出来的二叉堆就会是满足“父节点要么大于子节点”另一个规则了)。

其实上面说的数组存储二叉堆,从上面二叉树结构已经找出他们的关系了,那就是:二叉堆节点在数组中的位置k(k是节点在数组中的下标), 对应的子节点在数组中的位置对应是 2k + 1 和 2k + 2

其实只要是有一定规则的二叉树,当插入一个元素的时候,为了保持规则,都需要进行一定的调整,因为二叉堆无非满足两种规则,因此也就涉及两种调整:向上调整 与 向下调整,这里对应到程序就是heapify()方法里的siftDownComparable和siftDownUsingComparator了。

/** 构造二叉堆 */
private void heapify() {
        Object[] array = queue;
        int n = size;
            // n指数组中的下标, >>> 或 <<< 表示数据在进行无符号的左右移动, 每移动一位就等于整除2
        int half = (n >>> 1) - 1;
        Comparator<? super E> cmp = comparator;
        if (cmp == null) {
            for (int i = half; i >= 0; i--)
                // 从最后一颗树开始, 将二叉树的最小值放置在parent位置, 直到全局都如此
                siftDownComparable(i, (E) array[i], array, n);
        }
        else {
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
}

二叉堆简单介绍这么多,已经不影响后面PriorityBlockingQueue相关内容了。

回到PriorityBlockingQueue

上面把PriorityBlockingQueue核心结构二叉堆介绍了,这里直接从PriorityBlockingQueue的出队、入队方法进行一步步分析讲解

1、入队
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;  // 获取锁,保证安全
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length)) // 判断是否需要扩容
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;  // 获取比较器,若无自定义,取默认
            if (cmp == null)
                siftUpComparable(n, e, array);  // 向上调整
            else
                siftUpUsingComparator(n, e, array, cmp); // 利用自定义比较器向上调整
            size = n + 1;
            notEmpty.signal();   // condition唤醒队列中等待的其他线程
        } finally {
            lock.unlock();
        }
        return true;
}

这里简要步骤如下:

  • 1、获取全局锁,以保障原子操作
  • 2、入队前,判断是否需要扩容
  • 3、根据比较器进行元素的向上调整,此处就是对二叉堆进行调整
  • 4、对condition队列中等待的线程进行唤醒
2、扩容
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // 先释放锁
        Object[] newArray = null;
      
        // CAS方式获取乐观锁(此处释放全局锁,获取乐观锁,主要是为了保证扩容操作的性能)
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
            try {
              
                // 小于64,+2,否则 >>1
                int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
                // 若扩容后超过最大配置,则退一步
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                  
                    // 做个校验保证
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE){
                      throw new OutOfMemoryError();
                    }
       
                    // 直接设置最大值
                    newCap = MAX_ARRAY_SIZE;
                }
              
                // 若此时数组还未发生变化(因为其他线程也可能在操作queue),则直接新建newArray
                if (newCap > oldCap && queue == array){
                    newArray = new Object[newCap];
                }     
            } finally {
                allocationSpinLock = 0; // 释放乐观锁
            }
        }
        // 如果newArray为空,说明其他线程操作了queue,而这也必然触发扩容,因此让出cpu
        // 其实这里有点不好理解,如果newArray==null,说明上面(newCap > oldCap && queue == array)条件不成立,即代表有其他线程操作了queue,而这里其他线程操作queue时候,必然也会遇到当前线程一样的需要扩容操作,因此就直接让出cpu即可。
        if (newArray == null) 
            Thread.yield();
      
        // 否则重新获取锁,进行扩容后的数据copy
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

其实扩容操作最核心的逻辑就是释放全局锁,获取乐观锁,在判断queue是否有其他线程在操作,只有这些都完成之后,在最后才去竞争全局锁,去自己扩容并拷贝原来的数据。

3、siftUpComparable向上调整

比较当前位置K插入值X,与其parant大小,如果过k<parent,则进行调整; 直到 K>=parent为止;

siftDownComparable逻辑相反,此处就不再介绍。

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)  // 若X>parent,则停止,否则进行siftUp处理
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}
4、出队
  public E poll() {
        final ReentrantLock lock = this.lock;  // 获取锁
        lock.lock();
        try {
            return dequeue();   // 出队,获取队列首元素
        } finally {
            lock.unlock();
        }
   }
   private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];   // 获取并返回第一个元素
            E x = (E) array[n];        // 获取最后一个元素
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            
            // 将获取的最后一个元素放到首位置, 然后进行siftDown堆化操作
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

其实上面注释已经很清楚了,这里只需要理解两个核心步骤即可:

  • 取出队列首元素
  • 将最后一个元素放置到首元素位置后,进行二叉堆的堆化操作(这里是向下堆化,即直到 K<=parent为止)。

其他remove、take等操作都类似,主要记住一点就是:只要队列元素有变化,就必须对二叉堆进行相应的堆化操作,以保证结构。

5、场景

很明显支持优先队列的场景,即入队元素满足一定的优先级,按此优先级保证消费顺序。

3、DelayQueue

看名字DelayQueue就知道是延迟队列,但是它内部不是直接通过数组或者链表,而是直接用了PriorityQueue作为自身存储结构,而PriorityQueue采用的是数组结构存储,因此DelayQueue内部也是采用数组存储。通过这么简单的解释就能清楚它有两个特性:延迟+优先级

那么理解<u>延迟+优先级</u>的在其中的作用就比较重要了:

  • 延迟:延迟消费的意思,即从队列中取出元素需要校验<u>延迟时间</u>,满足了才返回。

  • 优先级:按照一定的比较逻辑(此处一般是延迟时间)进行顺序存放,其实这里采用的是二叉堆结构,不理解二叉堆的可以在PriorityBlockingQueue一节中了解。这样才能保证每次取出的元素必然是最靠近延迟时间到达的元素,才能保证消费按照延迟序列进行。

话不多说,线上demo(大部分时候一个简单的demo就能胜过千言万语,无言胜有言嘛)

public class DelayQueueTest {
    public static void main(String[] args) {
        DelayQueue<Item> delayQueue = new DelayQueue<>();
        delayQueue.add(new Item("a", 2L, System.currentTimeMillis()));
        delayQueue.add(new Item("b", 1L, System.currentTimeMillis()));
        delayQueue.add(new Item("c", 3L, System.currentTimeMillis()));
        delayQueue.add(new Item("d", 6L, System.currentTimeMillis()));
        delayQueue.add(new Item("e", 1L, System.currentTimeMillis()));
        delayQueue.add(new Item("f", 4L, System.currentTimeMillis()));
        delayQueue.add(new Item("g", 10L, System.currentTimeMillis()));
        delayQueue.add(new Item("h", 9L, System.currentTimeMillis()));
       
        print(delayQueue);
        new Thread(()-> consume(delayQueue),"消费线程1").start();
        new Thread(()-> consume(delayQueue),"消费线程2").start();
        new Thread(()-> consume(delayQueue),"消费线程3").start();
    }

    private static void print(DelayQueue<Item> delayQueue) {
        Iterator<Item> it = delayQueue.iterator();
        System.out.print("二叉堆数组结构: ");
        while(it.hasNext()){
            System.out.print( it.next().key + ",");
        }
    }

    private static void consume(DelayQueue<Item> delayQueue) {
        while (delayQueue.size() > 0) {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + delayQueue.take().getKey());
            } catch (InterruptedException e) {
            }
        }
    }

    @AllArgsConstructor
    @Data
    static class Item implements Delayed{
        private String key;
        private long exprieTime;
        private long beginTime;

        @Override
        public long getDelay(TimeUnit unit) {
            return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - beginTime);
        }
        @Override
        public int compareTo(Delayed o) {
            return this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS) ? 1 : 0;
        }
    }
}

看输出:

二叉堆数组结构: b,e,c,d,a,f,g,h
消费线程1: b
消费线程2: e
消费线程2: a
消费线程1: c
消费线程3: f
消费线程2: d
消费线程1: h
消费线程3: g

哈,看数组结构,就很清楚已经按照了优先级进行处理了,并且这里每次出队一个元素就要进行相应的堆化操作

那么接下来分析代码:

1、核心属性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();

理解了上面几个属性的作用,基本上也就理解了<u>DelayQueue</u>的一个设计了

  • ReentrantLock lock: 重入锁,保证操作安全
  • PriorityQueue<E> q:优先级队列,内部用数组存储延迟队列元素
  • Thread leader:指向的是第一个从队列获取元素阻塞等待的线程,目的是为了减少其他区线程不必要的等待时间
  • Condition available:唤醒条件,这里入队会唤醒,出队更换leader线程时也换唤醒,具体看后面代码。
2、入队操作
    public boolean offer(E e) {
      
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          
            // 元素入队,其实这里是PriorityQueue的入队操作,因此还会涉及到堆化操作,类似思路见PriorityBlockingQueue
            q.offer(e);
          
            // 若此时队列中首元素就是刚插入的,则将等待锁指向null,唤醒所有等待的线程。
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

注释简单明了,不多做解释。

3、出队操作
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                // 如果队列为空,阻塞等待
                if (first == null)
                    available.await();
              
                                // 否则需要校验取出的元素是否满足延迟
                else {
                    long delay = first.getDelay(NANOSECONDS);
                  
                    // 若延迟满足条件,直接取出返回
                    if (delay <= 0)
                        return q.poll();
                  
                    // 否则置为空,这里也是防止内存溢出吧,
                    // 因为多个线程会同时保持对first的引用,从而导致无法回收。
                    first = null; 
                  
                    // 如果leader不为空,当前线程就直接阻塞掉(毕竟leader才能获取元素)
                    if (leader != null)
                        available.await();
                    else {
                        // 如果leader为空,那就拿当前线程作为leader,然后等待delay时长
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            // 一般在这里会释放掉leader的
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 如果leader为空并且队列不为空的情况下,通知其他的线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

相比入队操作要复杂多了,因为这里涉及到delay的一个判断跟当前等待线程的一个置换。其实这里逻辑很简单,只要保证只有leader线程能获取元素就好了,同时如果是自己获取完,释放leader即可。

4、LinkedTransferQueue

0、概要(异性等待)

LinkedTransferQueue就像一对异性(读写)朋友,当没有异性在队列中等待自己的话,自己就入队等待其他异性,否则直接匹配牵手成功,不用等待了,解放彼此。

话不多说,还是先来演示个小的demo,从demo入手

1、消费跟生产者同时存在
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();

        new Thread(()->{
            try {
                while (true){
                    System.out.println(Thread.currentThread().getName() + "准备出队...");
                    Thread.sleep(1000);
                    Integer a = q.take();
                    System.out.println(Thread.currentThread().getName() + "出队完成,值为:" + a);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"线程B").start();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + "准备入队...");
                Thread.sleep(2000);
                q.transfer(1);
                System.out.println(Thread.currentThread().getName() + ": 1入队完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"线程A").start();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + "准备入队...");
                Thread.sleep(2000);
                q.transfer(2);
                System.out.println(Thread.currentThread().getName() + ": 2入队完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"线程C").start();

结果

线程B准备出队...
线程A准备入队...
线程C准备入队...
线程A: 1入队完成
线程B出队完成,值为:1
线程B准备出队...
线程C: 2入队完成
线程B出队完成,值为:2
线程B准备出队...
2、只有生产者
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + "准备入队...");
                Thread.sleep(2000);
                q.transfer(1);
                System.out.println(Thread.currentThread().getName() + ": 1入队完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"线程A").start();

结果

线程A准备入队...

3、只有消费者
LinkedTransferQueue<Integer> q = new LinkedTransferQueue();

    new Thread(()->{
        try {
            while (true){
            System.out.println(Thread.currentThread().getName() + "准备出队...");
            Thread.sleep(1000);
            Integer a = q.take();
            Thread.sleep(10);
            System.out.println(Thread.currentThread().getName() + "出队完成,值为:" + a);
        }
        } catch (Exception e) {
            e.printStackTrace();
        }
},"线程B").start();

结果

线程B准备出队...

从上面结果可以看出

  • 1、当生产者跟消费者都同时存在的情况下,入队出队都能正常进行,只不过要出队一次才能入队,入队一次才能出队

  • 2、当只具备一方(生产者或者消费者)时,都不能进行下去,只能等待,哪怕入队也是如此。

以上功能就是LinkedTransferQueue具备SynchronousQueue的功能,当然LinkedTransferQueue还具LinkedBlockingQueues功能(这一点这里就不做详细演示)。

LinkedTransferQueue在这点上是一个阻塞模式,跟SynchronousQueue类似,不存元素;

当生产者入队时,如果发现没有消费者在等待(源码上就是队列的最后一个Node模式是生产模式,那就创建一个新的Node入队,当前生产线程绑定到该Node进行等待);此时如果消费者线程进入,就会直接判断当前获取的node是否是含有数据的(isData==true),如果是,直接取出即可,若不是,走下面的逻辑。

消费者到队列中出队元素时,如果发现队列是空,则会生成一个null节点,然后阻塞等待生产者,后面如果生产者入队时发现有一个消费者元素节点,直接将元素填充到该节点Node上,唤醒该节点的消费者线程,被唤醒的消费者线程直接出队即可。

下面直接那代码分析一下(先重点介绍一下特性):

  • transfer(E e):入队操作,若当前存在一个正在等待获取的消费者线程,立刻填充其node节点返回;否则,会生成node结构插入到队列尾部,进入阻塞状态,一直到消费者线程取走该元素
  • tryTransfer(E e):入队操作,transfer的尝试操作,存在等待的消费者线程则转交之,否则返回不入队
  • tryTransfer(E e, long timeout, TimeUnit unit):在tryTransferr(E e)增加一个等待时间,若timeout时间内都没有消费者线程取数据,则返回,不入队
  • hasWaitingConsumer():判断是否存在消费者线程
  • getWaitingConsumerCount():获取所有等待获取元素的消费线程数量
LinkedTransferQueue
4、源码分析
public void transfer(E e) throws InterruptedException {
  if (xfer(e, true, SYNC, 0) != null) {  // 核心逻辑,入队操作
    Thread.interrupted(); // failure possible only due to interrupt
    throw new InterruptedException();
  }
}
private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // 构造一个node节点

        retry:
        for (;;) {                            // 循环匹配处理

            for (Node h = head, p = h; p != null;) { // 头节点处理:find & match first node
                boolean isData = p.isData;    // 头节点处理
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched
                    // 这里是关键,如果两者模式一样,则不能匹配,为啥?
                    // 因为生产者跟消费者操作都需要入队,即队列中存在的是对应的操作动作,如果isData为true,代表当前队列的头节点是一个生产者节点,存在相应的数据,那么消费者线程可以直接匹配获取;如果isData为false,代表当前队列的头节点是消费者节点,那么生产者线程可以直接交付数据;(一句话概括就是:如果队列中正在等我的是另一个线程,就可以直接转移了),其他情况只能入队等待啦。
                    if (isData == haveData)   // can't match
                        break;
                  
                    // 这里再尝试匹配一次
                    if (p.casItem(item, e)) { // match
                        // 循环处理,这里多个线程在竞争头节点哦,所以cas了下
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                      
                        // // 唤醒等待的线程
                        LockSupport.unpark(p.waiter);
                      
                        // 返回匹配的元素
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
              
                // 一次循环没有成功,或者说当前head已经被其他线程匹配掉了,那重新来过
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            // 终究没有成功,只能入队了(当前头节点类型跟自己一致)
            // 如果不是立即返回,那就看异同步了
            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                // 尝试入队
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    // 如果当前节点的上一个节点没了,就再尝试去匹配
                    continue retry;           // lost race vs opposite mode
                
              // 同步==阻塞等待,相当入队了,你给我等着吧
              if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }

上面注释应该很清楚,入队出队都是这个方法,其中isData是模式判断,用于是否匹配操作;how是执行方式,是同步、异步等;nanos是超时时间啦。简单总结如下

(1)入队或者出队,先查看队列头的节点模式(判断isData是否跟自己不是一样,不一样就能匹配了);

(2)如果模式不一样,就尝试让他们匹配(这其中会有其他线程也在操作,可能头节点被其他线程匹配走了,所以会持续往下,就是代码中的循环操作,知道队列尾部)

(3)如果模式一样,那就代表没有异性在等待自己了,或者到链表尾了,尝试入队等待吧;

(4)入队时也可能其他线程在操作哦,所以如果链表尾节点被修改了,那就再重新尝试入队,继续循环;

(5)入队成功了,就说明真没希望了,只能等了,等其他线程唤醒自己吧;‘

(6)唤醒了就说明有机会匹配了,仅仅是有机会哈,继续循环匹配吧。

NOW:立即返回,不管有没有匹配到,不做入队操作
   如:poll()、tryTransfer(e)

ASYNC:异步操作,入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
   如:add(e)、offer(e)、put(e)、offer(e, timeout, unit)

SYNC:同步操作,元素入队后当前线程阻塞,一直等待对方到来被匹配到
   如:take()、transfer(e)

TIMED:超时机制,入队会等待一段时间被匹配(等待对方的到来哦),超时还没匹配到就返回
   如:poll(timeout, unit)、tryTransfer(e, timeout, unit)

5、LinkedBlockingDeque

LinkedBlockingDequeDouble Ended Queue的缩写,基于链表实现的线程安全的双端阻塞队列,即可以从头部或者尾部去插入或者获取节点数据。默认构造长度为Integer.MAX_VALUE,当然长度可以在构造时指定。

其内部维护了一个全局独占锁,和两个Condition对象,用来阻塞和唤醒线程,以保证线程安全。

// 独占锁
final ReentrantLock lock = new ReentrantLock();
// 非空Condition
private final Condition notEmpty = lock.newCondition();
// 非满Condition
private final Condition notFull = lock.newCondition();
LinkedBlockingDeque

相应操作支持三种策略:

  • block 如果无法立即返回元素,则阻塞直到队列可以获取
  • block with timeout 如果无法立即返回元素,则超时阻塞一定时长,等待获取
  • special value 如果无法立即返回元素,则返回true、false、null等
  • throw exception 如果无法立即返回元素,则抛出指定异常
block block with timeout special value throw exception
putXxx offerXxx(TimeUint) offerXxx(O) addXxx
takeXxx pollXxx(TimeUint) pollXxx() getXxx
peekXxx() removeXxx

集中看一下代码

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>{

    /** 节点 */
    static final class Node<E> {
        /** 节点内容 */
        E item;

        /** 上一个节点 */
        Node<E> prev;

        /** 下一个节点 */
        Node<E> next;
    }

    /** 队列首节点 */
    transient Node<E> first;

    /** 队列尾节点 */
    transient Node<E> last;

    /** 队列节点数=队列长度,对外通过size()方法获取 */
    private transient int count;

    /** 队列最大容量 */
    private final int capacity;

    /** 主锁 */
    final ReentrantLock lock = new ReentrantLock();

    /** 非空条件 */
    private final Condition notEmpty = lock.newCondition();

    /** 非满条件 */
    private final Condition notFull = lock.newCondition();、
    
    /**
     * 头部插入一个节点(内容),如果满了返回false
     * 私有方法,入队操作方法之一
     * 操作方向:队列头部
     * 唤醒:notEmpty
     */
    private boolean linkFirst(Node<E> node)

    /**
     * 尾部插入一个节点(内容),如果满了返回false
     * 私有方法,入队操作方法之一
     * 操作方向:队列尾部
     * 唤醒:notEmpty
     */
    private boolean linkLast(Node<E> node)
      
    /**
     * 头部插入一个节点(内容),如果空了返回false
     * 私有方法,出队操作方法之一
     * 操作方向:队列头部
     * 唤醒:notFull
     */
    private E unlinkFirst()
      
    /**
     * 尾部移除一个节点(内容),如果空了返回false
     * 私有方法,出队操作方法之一
     * 操作方向:队列尾部
     * 唤醒:notFull
     */
    private E unlinkLast()

    /**
     * 移除指定节点(内容)
     * 唤醒:notFull
     */
    void unlink(Node<E> x)

    /**
     * 头部插入一个节点(内容)
     * 操作方向:队列头部
     * 策略:不成功抛出异常
     */
    public void addFirst(E e)

    /**
     * 尾部插入一个节点(内容)
     * 操作方向:队列尾部
     * 策略:不成功抛出异常
     */
    public void addLast(E e) 

    /**
     * 头部插入一个节点(内容)
     * 操作方向:队列头部
     * 策略:返回true或者false
     */
    public boolean offerFirst(E e)

    /**
     * 尾部插入一个节点(内容)
     * 操作方向:队列尾部
     * 策略:返回true或者false
     */
    public boolean offerLast(E e)

    /**
     * 头部插入一个节点(内容)
     * 操作方向:队列头部
     * 策略:阻塞等待
     */
    public void putFirst(E e) throws InterruptedException

    /**
     * 尾部插入一个节点(内容)
     * 操作方向:队列尾部
     * 策略:阻塞等待
     */
    public void putLast(E e) throws InterruptedException

    /**
     * 头部插入一个节点(内容)
     * 操作方向:队列头部
     * 策略:阻塞超时等待
     */
    public boolean offerFirst(E e, long timeout, TimeUnit unit)throws InterruptedException 

    /**
     * 尾部插入一个节点(内容)
     * 操作方向:队列尾部
     * 策略:阻塞超时等待
     */
    public boolean offerLast(E e, long timeout, TimeUnit unit)throws InterruptedException

    /**
     * 头部移除一个节点(内容)
     * 操作方向:队列头部
     * 策略:失败抛出异常
     */
    public E removeFirst()
      
    /**
     * 尾部移除一个节点(内容)
     * 操作方向:队列尾部
     * 策略:失败抛出异常
     */
    public E removeLast()
      
    /**
     * 头部移除一个节点(内容)
     * 操作方向:队列头部
     * 策略:返回true或者false
     */  
    public E pollFirst()
      
    /**
     * 尾部移除一个节点(内容)
     * 操作方向:队列尾部
     * 策略:返回true或者false
     */  
    public E pollLast() 
      
    /**
     * 头部移除一个节点(内容)
     * 操作方向:队列头部
     * 策略:阻塞等待
     */
    public E takeFirst() throws InterruptedException 
      
    /**
     * 尾部移除一个节点(内容)
     * 操作方向:队列尾部
     * 策略:阻塞等待
     */
    public E takeLast() throws InterruptedException
      
        /**
     * 头部移除一个节点(内容)
     * 操作方向:队列头部
     * 策略:阻塞超时等待
     */
    public E pollFirst(long timeout, TimeUnit unit)throws InterruptedException
      
        /**
     * 尾部移除一个节点(内容)
     * 操作方向:队列尾部
     * 策略:阻塞超时等待
     */
    public E pollLast(long timeout, TimeUnit unit)throws InterruptedException

        /**
     * 获取首节点(内容),非移除
     * 操作方向:队列头部
     * 策略:失败抛出异常
     */
    public E getFirst()
      
        /**
     * 获取尾节点(内容),非移除
     * 操作方向:队列尾部
     * 策略:失败抛出异常
     */
    public E getLast()
      
        /**
     * 获取首节点(内容),非移除
     * 操作方向:队列头部
     * 策略:失败返回null
     */
    public E peekFirst()

        /**
     * 获取尾节点(内容),非移除
     * 操作方向:队列尾部
     * 策略:失败返回null
     */
    public E peekLast()
      
        /**
     * 从头开始遍历,移除指定节点
     * 策略:返回true或者false
     */
    public boolean removeFirstOccurrence(Object o)

    /**
     * 从尾开始遍历,移除指定节点
     * 策略:返回true或者false
     */
    public boolean removeLastOccurrence(Object o)

    /**
     * 默认首部添加
     * 策略:失败异常
     */
    public void push(E e)

    /**
     * 默认首部移除
     * 策略:失败异常
     */
    public E pop()
      
    /** ========================================================== */

    /** 剩下的方法基本都是从父类继承,与其他结构类似,此处不做详细介绍了,如: */
      
    /**
     * 继承至AbstractQueue,默认尾部添加
     * 策略:失败异常
     */
    public boolean add(E e)

    /**
     * 继承至Queue,默认尾部添加
     * 策略:返回true或者false
     */
    public boolean offer(E e)
      
    /** ========================================================== */
      
    /** 拆分迭代器 */
    static final class LBDSpliterator<E> implements Spliterator<E> {
        static final int MAX_BATCH = 1 << 25;  // max batch array size;
        final LinkedBlockingDeque<E> queue;
        Node<E> current;    // current node; null until initialized
        int batch;          // batch size for splits
        boolean exhausted;  // true when no more nodes
        long est;           // size estimate

        public long estimateSize() { return est; }

        /** 此处拆分以步进的方式进行,每次多1,直到整个queue元素全部拆完,下面以代码演示比较直观 */
        public Spliterator<E> trySplit()

        public void forEachRemaining(Consumer<? super E> action)

        public boolean tryAdvance(Consumer<? super E> action)

        public int characteristics()
    }

}

分割器测试效果

@Test
public void testSplit() throws InterruptedException {
   BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(1000);
   Spliterator s = deque.spliterator();
   for(int i =0; i<100; i++){
        deque.put(i);
   }
   while(true){
       System.out.println();
       s.trySplit().forEachRemaining(k -> System.out.print(k+","));
   }
}
 
## 输出
0,
1,2,
3,4,5,
6,7,8,9,
10,11,12,13,14,
15,16,17,18,19,20,
21,22,23,24,25,26,27,
28,29,30,31,32,33,34,35,
36,37,38,39,40,41,42,43,44,
45,46,47,48,49,50,51,52,53,54,
55,56,57,58,59,60,61,62,63,64,65,
66,67,68,69,70,71,72,73,74,75,76,77,
78,79,80,81,82,83,84,85,86,87,88,89,90,
91,92,93,94,95,96,97,98,99,

6、LinkedBlockingQueue

<u>LinkedBlockingQueue</u>是FIFO队列,基于链表实现的线程安全的先进先出阻塞队列,即只可以从头部获取元素,尾部插入元素。默认构造长度为Integer.MAX_VALUE,当然长度可以在构造时指定。

明白了上面介绍的LinkedBlockingDeque,理解LinkedBlockingQueue是非常简单的,其实这里LinkedBlockingDeque功能覆盖了LinkedBlockingDeque功能,其putLast(E e) 和takeFirst()就可以模拟其FIFO效果。

当然主要差异是LinkedBlockingDeque是双向链表,而LinkedBlockingQueue是单向链表。

LinkedBlockingQueue

其内部维护了一个全局独占锁,和两个Condition对象,用来阻塞和唤醒线程,以保证线程安全。

/** 取元素锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 非空条件 */
private final Condition notEmpty = takeLock.newCondition();

/** 存元素锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 非满条件 */
private final Condition notFull = putLock.newCondition();

相关文章

网友评论

      本文标题:2.BlockingQueue综合分析

      本文链接:https://www.haomeiwen.com/subject/arrbektx.html