美文网首页一些收藏Java技术升华面试精选
Java并发编程——LinkedBlockingQueue

Java并发编程——LinkedBlockingQueue

作者: 小波同学 | 来源:发表于2021-12-18 22:38 被阅读0次

    一、阻塞队列 BlockingQueue

    在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

    1.1、BlockingQueue的基本原理

    先来解释一下阻塞队列:


    如上图:

    • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
    • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

    阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

    阻塞队列的常用方法

    查阅BlockingQueue总结了以下阻塞队列的方法:

    1、boolean add(E e)

    • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

    2、boolean offer(E e)

    • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

    3、void put(E e)

    • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

    4、boolean offer(E e, long timeout, TimeUnit unit)

    • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

    5、E take()

    • 获取并移除队列头部的元素,无元素时候阻塞等待。

    6、E poll( long time, timeunit unit)

    • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

    7、boolean remove()

    • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

    8、E element()

    • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

    9、E peek()

    • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

    注意:
    根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
    以上支持阻塞和超时的方法都是能够响应中断的。

    1.2、BlockingQueue的实现

    BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

    下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

    二、LinkedBlockingQueue

    LinkedBlockingQueue也是一个阻塞队列,相比于ArrayBlockingQueue,他的底层是使用链表(单向链表)实现的,而且是一个可有界可无界的队列,在生产和消费的时候使用了两把锁,提高并发,是一个高效的阻塞队列。

    LinkedBlockingQueue底层的数据结构是链表,这一点很容易验证,在源码中,我们可以看到它有一个内部类Node,基本源码如下所示:

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        
        //链表节点定义    
        static class Node<E> {
            //节点中存放的值
            E item;
    
            //下一个节点
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    }   
    

    从上面的注释可以知道,当某个node节点的next节点为null的时候,说明当前节点是最后一个节点。

    LinkedBlockingQueue的基本成员属性如下代码所示:

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /** 队列容量,最大为Integer.MAX_VALUE */
        private final int capacity;
    
        /** 队列长度 */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * 头结点
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * 尾结点
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** 移除操作的锁,take/poll方法用到 */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 移除操作需要等待的条件notEmpty,与takeLock绑定 */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** 入队操作的锁,put/offer方法用到 */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** 入队操作需要等待的条件notFull,与putLock绑定 */
        private final Condition notFull = putLock.newCondition();
    }   
    

    可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

    2.1、构造函数

    • 容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
            
        public LinkedBlockingQueue() {
            // 调用有参构造函数,初始化容量capacity为int最大值
            this(Integer.MAX_VALUE);
        }
    
        public LinkedBlockingQueue(int capacity) {
            // 容量不能小于0,注意也不能等于0,这点与常规的集合不同
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            // 初始化头结点和尾结点为哑节点
            last = head = new Node<E>(null);
        }
        
        // 将已有的集合全部入队
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            // 获取到putLock锁
            final ReentrantLock putLock = this.putLock;
            // 加锁,保证线程安全
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    // 节点内的值不能为null
                    if (e == null)
                        throw new NullPointerException();
                    // 判断队列是否满了 
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    // 将Node节点添加到队列的尾部,last = last.next = new Node<E>(e);   
                    enqueue(new Node<E>(e));
                    ++n;
                }
                // 原子类设置Node节点个数,线程安全
                count.set(n);
            } finally {
                // 解锁
                putLock.unlock();
            }
        }   
    }   
    

    2.2、阻塞入队

    LinkedBlockingQueue提供的入队的方法有多个,包括add、offer、put。

    2.2.1、add(E e)方法

    其中add(E e)调用的就是offer(E e),offer方法入队成功返回true,入队失败(队列已满或者阻塞超时)会返回false,那么add方法调用offer方法返回false的话,那么就抛出异常,代码如下:

    public abstract class AbstractQueue<E>
        extends AbstractCollection<E>
        implements Queue<E> {
        
        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    }
    

    2.2.2、offer(E e)方法

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        public boolean offer(E e) {
            // 如果存入的值为null,直接抛出空指针异常
            if (e == null) throw new NullPointerException();
            // 获取队列元素个数
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                //如果已经满了,直接返回失败
                return false;
            // 预先设置c为 -1,约定负数为入队失败  
            int c = -1;
            Node<E> node = new Node<E>(e);
            // 获取入队锁
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                //双重判断
                if (count.get() < capacity) {
                    //加入链表
                    enqueue(node);
                    c = count.getAndIncrement();
                    // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
                    if (c + 1 < capacity)
                        //唤醒生产者线程,继续插入
                        // 如果添加数据后还队列还没有满,
                        //则继续调用notFull的signal方法唤醒其他等待在入队的线程,继续插入
                        notFull.signal();
                }
            } finally {
                // 释放锁
                putLock.unlock();
            }
            if (c == 0)
                //说明里面有一个元素,唤醒消费者
                signalNotEmpty();
            return c >= 0;
        }
    }   
    

    2.2.3、offer(E e, long timeout, TimeUnit unit)方法

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
            
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            // 如果存入的值为null,直接抛出空指针异常
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            // 预先设置c为 -1,约定负数为入队失败
            int c = -1;
            // 获取入队锁
            final ReentrantLock putLock = this.putLock;
            // 获取队列元素个数
            final AtomicInteger count = this.count;
            // 加锁
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    // 如果超时时间过了队列仍然是满的话就直接返回false
                    if (nanos <= 0)
                        return false;
                    // 否则调用awaitNanos等待,超时会返回<= 0L  
                    nanos = notFull.awaitNanos(nanos);
                }
                // 如果上述没有阻塞,也就是队列没有满,那么这里直接入队
                enqueue(new Node<E>(e));
                // 队列元素个数+1,此时c为元素入队前的个数,也就是比当前队列元素个数少1
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    // 如果添加数据后还队列还没有满,
                    //则继续调用notFull的signal方法唤醒其他等待在入队的线程
                    notFull.signal();
            } finally {
                // 释放锁
                putLock.unlock();
            }
            // c==0说明队列中有一个元素了,那么就需要唤醒其他正在等待出队的线程
            // 这一点可能不好理解,c = count.getAndIncrement();理解了就差不多
            if (c == 0)
                signalNotEmpty();
            return true;
        }
    }   
    

    我们一起总结一下上述的入队源码:

    • 1、入队第一步,上锁,这样保证了线程安全,保证了同一时刻只能有一个入队线程在操作队列。

    • 2、如果队列满了,那么会产生阻塞,如果阻塞时间过了,队列依旧是满的,那么将返回false,放弃入队。

    • 3、如果队列没有满,那么直接将入队元素加入到队列的尾部,然后检查当前队列是否满了,如果没有满,则唤醒其他入队线程。

    • 4、最后检查入队前的队列是否为空(c==0就表示当前入队操作前,是一个空队列),如果为空,那么就有可能存在等待出队的线程在阻塞着,那么在这里进行唤醒。

    2.2.4、put(E e)方法

    对于put方法,它也是入队的一个方法,这个方法和offer方法原理几乎一致,最大的区别在于put方法没有阻塞超时时间,如果队列满了,那么执行put方法的线程将一直阻塞下去。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        public void put(E e) throws InterruptedException {
            // 如果存入的值为null,直接抛出空指针异常
            if (e == null) throw new NullPointerException();
            // 预先设置c为 -1,约定负数为入队失败
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            // 使用AtomicInteger保证原子性
            final AtomicInteger count = this.count;
            // 获取put锁
            putLock.lockInterruptibly();
            try {
                // 如果队列满了,则进入put条件队列等待
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 队列不满,或者被取数线程唤醒了,那么会继续执行
                // 这里会往阻塞队列末尾添加一个数据
                enqueue(node);
                c = count.getAndIncrement();
                // 如果队列不满,则唤醒等待时间最长的put线程
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                // 释放put锁
                putLock.unlock();
            }
            // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
            if (c == 0)
                signalNotEmpty();
        }
        
        //直接放到链表的尾部
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }   
    }   
    

    2.3、阻塞出队

    2.3.1、remove()方法

    public abstract class AbstractQueue<E>
        extends AbstractCollection<E>
        implements Queue<E> {
        
        public E remove() 
            // 调用poll()方法出队
            E x = poll();
            if (x != null)
                // 如果有元素出队就返回这个元素
                return x;
            else
                // 如果没有元素出队就抛出异常
                throw new NoSuchElementException();
        }
    }
    

    2.3.2、poll()方法

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
            
        public E poll() {
            final AtomicInteger count = this.count;
            //如果队列为空,直接返回空
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            // 获取take锁
            final ReentrantLock takeLock = this.takeLock;
            // 上锁
            takeLock.lock();
            try {
                // 如果队列不空
                if (count.get() > 0) {
                    //调用dequeue获取队列中的数据
                    x = dequeue();
                    // 阻塞队列数量减1
                    c = count.getAndDecrement();
                    // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
                    if (c > 1)
                        // 释放take锁
                        notEmpty.signal();
                }
            } finally {
                // 解锁
                takeLock.unlock();
            }
            // 如果c == capacity就是说队列中有一个空位,唤醒入队线程
            if (c == capacity)
                signalNotFull();
            return x;
        }
    }   
    

    2.3.3、poll(long timeout, TimeUnit unit)方法

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            // 获取take锁
            final ReentrantLock takeLock = this.takeLock;
            // 上锁
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    // 如果队列空了,则进入take条件队列等待
                    // 且如果阻塞时间过期,那么将返回null
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                // 在超时时间内返回,则调用dequeue获取队列中的数据
                x = dequeue();
                // 阻塞队列数量减1
                c = count.getAndDecrement();
                // 如果c > 1,说明队列中还有节点元素,那么继续唤醒其他出队线程
                if (c > 1)
                    notEmpty.signal();
            } finally {
                // 解锁
                takeLock.unlock();
            }
            // 如果c == capacity就是说队列中有一个空位,唤醒入队线程
            if (c == capacity)
                signalNotFull();
            return x;
        }
    }   
    

    2.3.4、take()方法

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            // 获取take锁
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                // 如果队列空了,则进入take条件队列等待
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 获取到第一个节点,非哑节点
                x = dequeue();
                // 阻塞队列数量减1
                c = count.getAndDecrement();
                // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
                if (c > 1)
                    notEmpty.signal();
            } finally {
                // 释放take锁
                takeLock.unlock();
            }
            // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
            if (c == capacity)
                signalNotFull();
            return x;
        }
        
        //通过这个方法可以看出,链表的首节点的值是null,每次获取元素的时候
        //先把首节点干掉,然后从第二个节点获取值
        private E dequeue() {
            Node<E> h = head;
            // 获取第一个元素结点first
            Node<E> first = h.next;
            // 将头结点自引用,并被垃圾回收掉
            h.next = h; // help GC
            // 将头结点指向第一个元素结点first
            head = first;
            // 获取第一个元素结点的值
            E x = first.item;
            // 将第一个元素结点的值置为null,成为新的哑节点
            first.item = null;
            // 返回被移除的节点元素值
            return x;
        }   
    }   
    

    take和put操作如下图所示:


    • 1、队列第一个节点为哑节点,占位用的;
    • 2、put操作一直往链表后面追加节点;
    • 3、take操作从链表头取节点;

    三、ArrayBlockingQueue与LinkedBlockingQueue对比

    队列 是否阻塞 是否有界 线程安全 适用场景
    ArrayBlockingQueue 一把ReentrantLock锁 生产消费模型,平衡处理速度
    LinkedBlockingQueue 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度

    3.1、ArrayBlockingQueue

    • 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;

    3.2、LinkedBlockingQueue:

    • 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
    • 两把锁,并发性能较好;
    • 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。

    参考:
    https://www.itzhai.com/articles/graphical-blocking-queue.html

    https://segmentfault.com/a/1190000039174436

    https://cloud.tencent.com/developer/article/1609320

    相关文章

      网友评论

        本文标题:Java并发编程——LinkedBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/azbgfrtx.html