美文网首页并发
Java1.8-LinkedBlockingQueue源码学习(

Java1.8-LinkedBlockingQueue源码学习(

作者: 骑着乌龟去看海 | 来源:发表于2019-03-01 10:35 被阅读23次

    一、概述

      上文我们学习了ArrayBlockingQueue的源码,本篇我们接着来学习LinkedBlockingQueue。ArrayBlockingQueue底层通过数组来实现,而与数组相对应,LinkedBlockingQueue底层是通过链表来实现的。

    LinkedBlockingQueue 是基于链表实现的有界的阻塞队列。为了防止链表过度扩展,LinkedBlockingQueue的容量是可选的,如果未指定容量,容量最大值是Integer.MAX_VALUE;

    二、源码

    1. 继承结构

    继承结构和ArrayBlockingQueue是一样的,这里就不多说了:

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
    2. 属性

    接下来,我们先来看一下它的一些属性:

    /** 链表容量, 如果没有指定,默认Integer.MAX_VALUE */
    private final int capacity;
    
    /** 链表中元素的个数 */
    private final AtomicInteger count = new AtomicInteger();
    
    /** 链表的头节点,头节点中不保存元素 */
    transient Node<E> head;
    
    /** 链表的尾节点,尾节点中不保存引用 */
    private transient Node<E> last;
    
    /** 出队元素锁 */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** 出队元素的Condition条件 */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** 入队元素锁 */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** 入队元素的Condition条件 */
    private final Condition notFull = putLock.newCondition();
    

    从这里可以看出,LinkedBlockingQueue包含了读重入锁,写重入锁,实现了锁的读写分离,并且不同的锁有不同的Condition条件。并且头节点和尾节点还有一些特性:

    头节点不包含元素,即head.item == null,也就是head.item一直为null;而尾节点不包含引用,即last.next == null,也就是last.next一直为null。

    3. 构造方法

    先来看一下前两个简单的构造方法:

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        // 容量值不能小于0
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 初始化头节点和尾节点
        last = head = new Node<E>(null);
    }
    

    默认情况下,队列的容量是Integer.MAX_VALUE。来看另一个构造方法:

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 初始化最大容量
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            // 循环将集合中元素添加到队列中
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                // 集合容量不能大于队列容量
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            // 设置数组容量
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
    

    从给定集合初始化一个队列,并且队列中元素不能为null,所给定集合容量不能大于队列容量,然后将元素转化为Node节点,保存到队列中。这里调用了入队的方法enqueue,后面再来介绍这个方法。

    4. 内部类

    LinkedBlockingQueue中提供了一个内部类Node,用来保存元素,这一点和ArrayBlockingQueue是不同的,因为链表除了保存常规的元素外,还需要有指针,指向下一个节点,来简单看一下源码:

    static class Node<E> {
        E item;
    
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    

    该节点比较简单,除了有一个元素类型之外,还有一个next指针,指向链表的下一个节点,该值如果为null,表示该节点没有后继节点,也就是该节点是最后一个节点。

    5. 方法
    5.1 put方法

    首先来看下put方法,该方法是一个阻塞的入队方法,来看下源码:

    public void put(E e) throws InterruptedException {
        // 元素非空判断
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 将元素转化为Node节点
        Node<E> node = new Node<E>(e);
        // 获取入队锁,及链表中元素个数
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 线程可中断
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 如果队列满了
            while (count.get() == capacity) {
                // 阻塞notFull条件对应的线程
                notFull.await();
            }
            // 线程可用了,进行入队
            enqueue(node);
            // 元素个数原子性增加,返回到是增加前的数量
            c = count.getAndIncrement();
            // 如果增加后的数量小于队列的容量,唤醒在notFull上等待的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 如果元素个数为0,表示队列原先为空,此时添加成功了,队列里有元素了,
        // 则需要唤醒在notEmpty条件上等待的线程
        if (c == 0)
            signalNotEmpty();
    }
    

    来简单看下流程:

    • 首先非空校验,然后将元素转化为节点,并获取锁;
    • 然后判断队列是否已满,如果满了,在notFull条件上进行等待;
    • 如果没满,入队操作,然后更新队列元素,若元素数量小于队列容量,唤醒在notFull上等待的线程,通知他们队列有可用空间了;
    • 释放锁后,判断元素入队前,队列中的元素是否为0,也就是说队列原先是否为空;如果原来的队列为空,而现在已经添加进元素了,所以激活notEmpty条件对应的线程,通知他们队列里已经有元素了。

    这中间调用了入队的方法enqueue,这里来简单看下:

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    

    该方法表示将节点添加至队列的尾部,然后更新链表的尾节点,可以画个简单的链表操作图来帮助理解。可能需要注意一点:

    尾节点是不包含引用的,只包含元素对象;

    至于唤醒notEmpty的方法signalNotEmpty,则比较简单,这里来看下源码:

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    
    5.2 offer方法

    put方法也就是入队方法,如果队列已满,返回false,其他的操作和put类似,这里来简单看下源码:

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 队列满了,直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 队列没满
            if (count.get() < capacity) {
                // 入队
                enqueue(node);
                c = count.getAndIncrement();
                // 如果添加后队列依旧没满,唤醒notFull对应线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 如果原队列为空,现在添加后不为空了,唤醒等在notEmpty上的线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    

    而重载的超时方法offer(E, long, TimeUnit)逻辑实现也差不太多,来简单看下:

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        if (e == null) throw new NullPointerException();
        // 获取超时时间
        long nanos = unit.toNanos(timeout);
        int c = -1;
        // 获取入队锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 如果队列已满
            while (count.get() == capacity) {
                // 然后再判断超时时间是否大于0,
                if (nanos <= 0)
                    return false;
                // 等待对应的超时时间,并且重置超时时间
                // 为什么重置呢,因为可能被notFull条件对应的线程进行唤醒操作
                nanos = notFull.awaitNanos(nanos);
            }
            // 入队
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    5.3 take方法

    take方法是一个阻塞的出队方法,其与put方法是相对应的,我们来看一下:

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,阻塞,直到队列不为空
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队
            x = dequeue();
            c = count.getAndDecrement();
            // 如果出队后队列依旧不为空,唤醒notEmpty上等待的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 如果原来队列已满,现在出队了一个,说明队列可用,唤醒notFull对应等待的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    来简单看下流程,流程其实是和put相对应:

    • 先获取出队的锁,然后判断队列是否为空,如果为空,一直阻塞知道队列不为空;
    • 队列不为空,出队,出队后如果队列依旧不为空,唤醒notEmpty上等待的线程;
    • 如果原来队列已满,现在出队了一个,说明队列可用了,则唤醒notFull条件上对应等待的线程。

    这里调用到了出队方法dequeue方法,来简单看下:

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        // 原先头节点
        Node<E> h = head;
        // 原先头节点的下一个节点(第一个节点)
        Node<E> first = h.next;
        // 原先头节点的next指针指向自己
        h.next = h; // help GC
        // 头节点后移
        head = first;
        // 获取第一个节点的元素值
        E x = first.item;
        // 将原来的第一个节点设置为null
        first.item = null;
        return x;
    }
    

    该方法的目的是为了更新头节点,将头节点更新为原先头节点的下一个节点,可能需要注意的是:

    头节点是不保存元素值的,只保存指向下一个节点的指针。

    而至于signalNotFull方法,就不多说了。

    5.4 poll方法

    出队方法poll,表示如果队列为空,直接返回null,其他操作与take方法类似,来简单看下源码:

    public E poll() {
        final AtomicInteger count = this.count;
        // 队列为空,直接返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 队列不为空,也就是队列中有元素
            if (count.get() > 0) {
                // 出队
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 同理,唤醒
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    而超时的poll方法与超时的offer方法是类似的,这里只贴下源码,不多说了:

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    5.5 peek方法

    至于peek方法,就比较简单,也没必要多说了:

    public E peek() {
        // 队列为空
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 获取头节点的下一个节点
            Node<E> first = head.next;
            // 如果第一个节点为空
            if (first == null)
                return null;
            else
                // 取值
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    
    5.6 remove方法

    remove方法表示移除队列中相应的元素,来看下源码:

    public boolean remove(Object o) {
        if (o == null) return false;
        // 获取入队锁和出队锁
        fullyLock();
        try {
            // 循环遍历,从队头到队尾
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    // 移除元素
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            // 释放入队锁和出队锁
            fullyUnlock();
        }
    }
    

    我们来简单看下流程:

    • 首先会获取入队锁和出队锁,因为移除元素的时候不允许再入队或出队;
    • 然后会循环遍历,从队头遍历到队尾,判断元素是否相等,相等的话进行移除操作;
    • 最后释放相应的锁。

    这里移除元素的操作会调用unlink方法,该方法用于将指定节点从链表中断开,我们来简单看下这个方法:

    /**
     * 这里trail节点是p节点的前驱节点
     */
    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        // 将p节点元素置为null
        p.item = null;
        // 将trail节点指向的下一个节点的引用,指向p节点的下一个节点
        // 也就是将p节点断开
        trail.next = p.next;
        // 如果p节点是尾节点,重置尾节点
        if (last == p)
            last = trail;
        // 如果原来队列已满,这里移除后,队列就可用了,唤醒在notFull上等待的线程
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
    

    这里的操作还会判断如果移除元素之前,队列已满,那么移除之后,队列就可用了,这时候会唤醒在notFull上等待的线程。另外,fullyLockfullyUnlock比较简单,就不多说了:

    /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    
    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    

    四、总结

    到这里,大部分常用的方法都源码我们都学习过了,这里我们进行简单的总结下:

    • LinkedBlockingQueue是基于链表实现的阻塞队列,容量可选,不指定容量的话,默认是Integer.MAX_VALUE;
    • LinkedBlockingQueue针对入队和出队分别提供了各自的可重入锁,实现了读写分离。

    本文参考链接除了官方文档外,还参考自:
    《Java并发编程实战》
    【JUC】JDK1.8源码分析之LinkedBlockingQueue(四)

    相关文章

      网友评论

        本文标题:Java1.8-LinkedBlockingQueue源码学习(

        本文链接:https://www.haomeiwen.com/subject/xkwruqtx.html