美文网首页
ConcurrentLinkedQueue, LinkedBlo

ConcurrentLinkedQueue, LinkedBlo

作者: 以梦为马驾驾驾 | 来源:发表于2021-11-04 20:42 被阅读0次

ConcurrentLinkedQueue, LinkedBlockingQueue源码阅读

concurrentLinkedQueue 和 LinkedBlockQueue不同,前者是lock-free算法的可被并发读写的队列,后者是阻塞的并发读写的队列,性能上来说,必然是前者优于后者。但是他们又有区别,后者可以当作缓冲的消息代理使用,通知插入或者读取的线程可以进行操作,前者不行。

ConcurrentLinkedQueue <= Queue

LinkedBlockingQueue <= BlockingQueue <= Queue

先来看LinkedBlockingQueue

private final int capacity;  
  
/** Current number of elements */  
private final AtomicInteger count = new AtomicInteger();  
  
/**  
 * Head of linked list. * Invariant: head.item == null */transient Node<E> head;  
  
/**  
 * Tail of linked list. * Invariant: last.next == null */private transient Node<E> last;  
  
/** Lock held by take, poll, etc */  
private final ReentrantLock takeLock = new ReentrantLock();  
  
/** Wait queue for waiting takes */  
private final Condition notEmpty = takeLock.newCondition();  
  
/** Lock held by put, offer, etc */  
private final ReentrantLock putLock = new ReentrantLock();  
  
/** Wait queue for waiting puts */  
private final Condition notFull = putLock.newCondition();

相比较于ConcurrentLinkedQueue,它比较简单,用了两个锁,以及关联在锁上的条件队列, 还有一个原子变量 AtomicInteger。

public void put(E e) throws InterruptedException {  
     if (e == null) throw new NullPointerException();  
     int c = -1; // set count = -1,是为了防御,当修改list操作发生错误的时候,count=-1可以指示错误
     Node<E> node = new Node<E>(e);   //  1. 新建node, 为什么node 以及node的域不是 volatile呢?以及insert的修改怎么能保证立刻可见呢?
     final ReentrantLock putLock = this.putLock;  
     final AtomicInteger count = this.count;  
      putLock.lockInterruptibly();  
     try {  
              while (count.get() == capacity) {   
                notFull.await();  // 2. 如果队列达到指定容量了,这个线程操作在notFull条件队列上的等待着
      }  
            enqueue(node);  
      c = count.getAndIncrement();  
     if (c + 1 < capacity)   // 3. 还有空的位置
                notFull.signal();  
      } finally {  
            putLock.unlock();  
      }  
     if (c == 0)   // 4. 为空,是怎么走到这一步的?
        signalNotEmpty();  
}
  1. node以及node域之所以可以不是volatile,是因为ReentrantLockAtomicInteger,首先lock的实现依赖于AQS,AQS的实现依赖于一个volatilestate域,加锁和释放锁,会对这个域进行读写会禁止某些重排序操作,并且将新的值写回主存,之后获取这个put锁的生产者线程将受到值变更的消息,重新从主存中读取;而消费者线程也会读取原子变量,而感知到自己需要的值的变化。
  2. 因为put是互斥的操作,只会有一个put线程能走到这里,然后如果容量满了,就等在条件上,直到被消费者线程唤醒,或者被其他生产者唤醒? c 是put之前的容量值, c+1是现在的,如果现在的容量之小于,说明还有空的,并且signal一下,防止有阻塞的生产者线程。
  3. 如2中陈述,如果put以后还有空位,继续唤醒其他的。()
  4. 在put完之后,检查c的值,如果是0, 即 count.getAndIncrement的之前的值是0,说明很可能有消费者已经阻塞,在notEmpty条件队列上等待着消费呢。
public E take() throws InterruptedException {  
     E x;  
     int c = -1;  
     final AtomicInteger count = this.count;  
     final ReentrantLock takeLock = this.takeLock;  
     takeLock.lockInterruptibly();  
     try {  
            while (count.get() == 0) {  // 1. 为空,等
                notEmpty.await();  
      }  
            x = dequeue();      // 4. 具体做了些什么
      c = count.getAndDecrement();  
     if (c > 1)                 // 2. 之前不止有一个,有多个
                notEmpty.signal();  
      } finally {  
            takeLock.unlock();  
      }  
        if (c == capacity)  // 3. 怎么会走到这里
            signalNotFull();  
     return x;  
}

private E dequeue() {  
    // assert takeLock.isHeldByCurrentThread();  
 // assert head.item == null;  Node<E> h = head;  
  Node<E> first = h.next;  
  h.next = h; // help GC  
  head = first;  
  E x = first.item;  
  first.item = null;  
 return x;  
}

  1. 没啥好说的,如果没有元素,只能等待
  2. 当下还有元素,如果有正在阻塞的消费者,就让它继续执行
  3. c==capacity,是在消费之前,元素个数满了,很可能有正在阻塞的生产者。
  4. 将head引用向后移动,并且减少一些引用赋值,加快与gc的相遇

在put与take中,为什么有3和2的情况?即为什么会有其他同样类型的线程需要同样的线程来唤醒,而不是由对立线程唤醒?
答:
场景举例:
a: capacity=10, 现在有5个生产者阻塞,在等待notFull信号,n个消费者过来消费了当前的n个元素,然后只有一个会调用signalNotFull, 会挑选最先阻塞的那个put线程,让他离开条件等待队列,进入同步队列,这个put线程会立刻获取到lock(如果没有新的来竞争), 但还是n-1个空缺,所以需要让生产者自己来唤醒自己的同类。反过来也是同理。

这样做的几个特点和好处:

  1. 区别于信号量控制的生产者和消费者模式,固定工作区大小的生产者和消费者,由对方来唤起,且每增加或者消费,就发信号,这里之所以不这么做是因为,signal是没有计数状态的,而信号量是可以有计数状态的。
  2. 不signalAll, 还可以保证同步队列中的顺序和wait条件队列中的顺序一致(如果在第一个唤醒的生产者lock期间,没有其他新的生产者过来lock,因为如果有的话,他会先一步wait条件队列中的其他线程排在了同步队列里,不过这并不是问题),避免了争夺锁,失败重新进入队列
private void signalNotEmpty() {  
    final ReentrantLock takeLock = this.takeLock;  
  takeLock.lock();  
 try {  
        notEmpty.signal();  
  } finally {  
        takeLock.unlock();  
  }  
}
private void signalNotFull() {  
    final ReentrantLock putLock = this.putLock;  
  putLock.lock();  
 try {  
        notFull.signal();  
  } finally {  
        putLock.unlock();  
  }  
}

再回到可以指定等待时间的其他方法

nanos = notEmpty.awaitNanos(nanos); // 不过是awaiNanos,超时后,返回0或者负数,如果没超时,返回剩余的时间

删除操作会锁柱Put和Take,然后遍历,找到node,并且将其从链表中去掉,再检查是否有Put被阻塞,有则将其唤醒。

另外不能插入null, 因为poll方法在列表为空的时候返回就是null, 如果能插入null, 就无法判断null是插入的, 还是代表列表为空的情况

再看ConcurrentLinkedQueue

它的实现中没有用到同步器,主要依靠CAS实现,是一种无锁算法(Lock-Free),不会阻塞线程,但其实现也是最复杂的。
首先它的实现是参考的:
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.实际上LinkedBlockingQueue也是参考的这篇论文。
另外在这个知乎回答里:求推荐Lock-Free 算法相关论文? - 知乎 (zhihu.com)
有关于lock-free算法论文的推荐。

image.png

每个元素都会被包装成Node

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
    Node(E item) { UNSAFE.putObject( this, itemOffset, item); }
    boolean castItem(E cmp, E val) { return UNSAFE.compareAndSwapObjectu( this, itemOffset, cmp, val); }
    void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
    boolean casetNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
    private static final sun.misc.Unsafe UNSAFE;  
private static final long itemOffset;  
private static final long nextOffset;  
  
static {  
    try {  
        UNSAFE = sun.misc.Unsafe.getUnsafe();  
  Class<?> k = Node.class;  
  itemOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("item"));  
  nextOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("next"));  
  } catch (Exception e) {  
        throw new Error(e);  
  }  
}
}

关键的属性:


private transient volatile Node<E> head;
private transient volatile Node<E> tail;

加入方法


public boolean offer(E e) {  
    checkNotNull(e);  
     final Node<E> newNode = new Node<E>(e);  
      
     for (Node<E> t = tail, p = t;;) {   // 1. 循环,直至成功
            Node<E> q = p.next;  
         if (q == null) {               // 2. 说明q是尾后节点, p是尾节点
                    // p is last node  
              if (p.casNext(null, newNode)) {   // 3. cas 设置
                  if (p != t) // hop two nodes at a time   // 4. 一次跳两个来设置尾节点
                      casTail(t, newNode); // Failure is OK.  
                  return true;  
              }   
                    // Lost CAS race to another thread; re-read next  
          } else if (p == q)       // 5. 这个情况是怎么来的?
                p = (t != (t = tail)) ? t : head;  
           else                    // 6. 这个情况又是怎么来的?
                p = (p != t && t != (t = tail)) ? t : q;  
          }  
}
  1. 循环,直到cas 插入成功才退出,循环每次都会因为 5, 6而更新 p
  2. 假设现在tail就是尾节点, 插入一个值。q == null , 说明p是尾节点,如果在这个时候,没有竞争,则cas操作插入一定没问题。如果有竞争,则cas操作会失败,此后p不再是最后的节点,是倒数第n个节点,则q 也不会再是 null。 随后可能是5, 也可能是6。 假设tail不是尾节点
  3. cas操作, 原子更新
  4. 根据代码注释: 属于一种优化,减少cas的次数
  5. 走到5是因为, 对单个元素的list进行出队操作,而造成的一种特殊情况,先看出队操作。
  6. p = (p != t && t != (t = tail)) ? t : q => p首先沿着链往下,在并发插入的过程中,寻找这最后的尾节点
    a. (p != t && t != (t = tail)) == true p 不是t ,且tail在并发插入的过程中,变化了,t是执行这条指令时最新的tail,则设置p为t ,为tail
    b. (p != t && t != (t = tail)) == false b1: p ==t 可以理解为a之后的cas插入又失败了, 因为还有其他线程在并发插入,则把p 设置为自己的后继,往下走 b2: b1之后, p != t, 但发生了 t == (t = tail), 即理解为并发插入停下了, p != t, 其实隐含着,p在t的后面的位置,所以p变成自己的后继,往下走

提取方法

public E poll() {  
    restartFromHead:                        // 1. label
    for (;;) {                              // 死循环
        for (Node<E> h = head, p = h, q;;) {  
            E item = p.item;                // 2.  2->3可能发生变化
            if (item != null && p.casItem(item, null)) {  // 3.   
                  // Successful CAS is the linearization point  
                  // for item to be removed from this queue.
                  if (p != h) // hop two nodes at a time    // 4 
                      updateHead(h, ((q = p.next) != null) ? q : p);  // 5
                  return item;  
             } else if ((q = p.next) == null) {  //6. 
                updateHead(h, p);  
                return null; 
            }  else if (p == q)   // 7.
                 continue restartFromHead;  
              else  p = q;  // 8.
           }  
     }  
}

  1. 首当其冲的是: restartFromHead 标签
  2. 因为提取操作只会从头节点的下个元素开始,如果没有别提取:
    a. item == null, 走到8,p是第一个元素, 如果3成功,则提取完成, 走到4,修改head.
    如果被提取,
    a. 或者:当前的p其实可能指向旧的head(也可能是新的,如果还没updateHead),head的item == null
    c. 或者 :当前的pp.casItem(item, null) 失败
    还是会走到8, 直到3处成功
  3. cas
    4.5 更新head节点,如果被弹出的节点,有后继节点,取后继节点, 没有后继节点,取被弹出去的node(反正item已经为null了), 作为 head。并且让原来的head的next指向自己。这个就造成了单个节点的自环,且由于从一开始head和tail是同一个元素,且tail是慢更新的,很可能tail指向这个旧的单个节点的自环,所以在上面的插入操作中的5, p == q 所以,要从head从头开始。
  4. 这个情况是 有一个线程要获取元素的时候,刚进入循环,读取了head, 其他一个线程迅速完成了其他元素的读取,导致这个敢读的head是个自环,所以需要从新开始。

相关文章

网友评论

      本文标题:ConcurrentLinkedQueue, LinkedBlo

      本文链接:https://www.haomeiwen.com/subject/zmnkzltx.html