美文网首页
读LinkedBlockingQueue源码记录

读LinkedBlockingQueue源码记录

作者: 浅笑丨无痕 | 来源:发表于2018-08-18 21:00 被阅读0次

简介

    LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

  和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

探索LinkedBlockingQueue

1.主要属性

static class Node<E> {
    //数据存储
    E item; 
    //后继节点
    Node<E> next;   
    Node(E x) { item = x; }
}
//最大容量,默认为Integer.MAX_VALUE
private final int capacity;
//当前队列中的元素数量,与ArrayBlockingQueue不同
private final AtomicInteger count = new AtomicInteger(0);
//头节点,head.item恒为null 
private transient Node<E> head;
//尾节点,last.next恒为null
private transient Node<E> last;
//出列锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件
private final Condition notEmpty = takeLock.newCondition();
//入列锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件
private final Condition notFull = putLock.newCondition();

从上面的属性可以看出,LinkedBlockingQueue主要以链表的方式进行数据存储,出列与入列通过不同的锁进行线程安全控制。

2.初始化

接着看构造方法。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);//默认最大容量为Integer.MAX_VALUE
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化收尾节点
    last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //加入列锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); 
    try {
        int n = 0;
        //遍历入列
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //处理尾节点
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

再看enqueue()方法:可读性不是很好,分析后,可以等价于last.next = node; last = node;即将node链接到当前队列的末尾,然后将末节点设置为当前node。

private void enqueue(Node<E> node) {
    last = last.next = node;//等价于 last.next = node; last = node;
}

3.入列

入列方法有offer(),和put()。

offer()方法:作用是将元素e插入到阻塞队列的尾部,如果队满,返回false,即插入失败。否则,插入元素。


public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    //获取当前元素数量
    final AtomicInteger count = this.count;
    //等于队列的最大容量,返回false
    if (count.get() == capacity)
        return false;
    int c = -1; //-1标识入列失败
    Node<E> node = new Node(e);
    //加锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //判断当前数量是否小于最大容量
        if (count.get() < capacity) {
            //尾节点处理
            enqueue(node);
            //当前容量+1
            c = count.getAndIncrement();
            //判断当前容量是否大于最大容量,如果小于最大容量,唤醒其他入列线程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // c!=-1即已经走了上面的代码的逻辑,所以此时队列长度不为0
    // 唤醒等待在NotEmpty条件的线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

再看 signalNotEmpty()方法:主要是唤醒等待在notEmpty线程的出列操作。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

offer()同样有重载一个延迟等待方法,可以看到该方法只是多了等待时间,如果超时就返回false,其他与offer()的逻辑一样。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    //获取等待时间(纳秒)
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //设置线程可中断
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            //设置最长等待时间
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

再看put()方法:与offer(E e, long timeout, TimeUnit unit)不同的地方在于,当队满时无限等待,这里不做多解释。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //当前队列满,则持续等待。
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

4.出列

出列的方法有take(),poll()等,由于逻辑基本相似,这里只分析take()方法。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //加可中断锁
    takeLock.lockInterruptibly();
    try {
        //当前队列长度为0,线程等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出列操作
        x = dequeue();
        //队列长度减一
        c = count.getAndDecrement();
        //队列长度还大于1,唤醒其他出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // c!=-1 说明已经执行上面的逻辑,此时队列长度可能小于最大长度,唤醒等待在NotFull入队线程
    if (c == capacity)
        signalNotFull();
    return x;
}

再看dequeue()方法:处理头结点的方法,获取第一个元素后,断开链表,让第一个结点作为头结点以及将其的数据部分设置为空,同时让原来的头结点自关联帮助GC。

这里我觉得有点奇怪,为什么不直接把first结点断开,然后h.next = first.next. 最后将first.next=null?

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 将头结点的后继结点指向自己,有助于GC
    head = first;   //将头结点
    E x = first.item;
    first.item = null;  //头结点具有一致性:即数据为null,后继结点才是真实数据结点
    return x;
}

5.其他方法

remainingCapacity()查看剩余的长度;

size() () 查看当前队列长度;

clear() 清除当前队列所有数据;

....

ArrayBlockingQueue和LinkedBlockingQueue比较

1.ArrayBlockingQueue一个有界的阻塞队列,由于其底层基于数组,初始化后就会立即在内存分配固定大小容量的数组元素;而LinkedBlockingQueue配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的,默认大小为Integer.MAX_VALUE的容量,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。

2.LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,而ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而故LinkedBlockingQueue的吞吐量要高于后者。

相关文章

网友评论

      本文标题:读LinkedBlockingQueue源码记录

      本文链接:https://www.haomeiwen.com/subject/dmqqiftx.html