美文网首页
[并发集合] LinkedBlockingQueue源码分析

[并发集合] LinkedBlockingQueue源码分析

作者: LZhan | 来源:发表于2020-01-03 17:32 被阅读0次

    转自 公众号【彤哥说源码】https://mp.weixin.qq.com/s/y6PoK3UbVLwdZoauLX8nsQ

    1 前言

    LinkedBlockingQueue是java并发包下一个以单链表实现的阻塞队列,它是线程安全的。

    2 源码分析

    2.1 重要属性

        /** The capacity bound, or Integer.MAX_VALUE if none */
        // 容量
        private final int capacity;
    
        /** Current number of elements */
        // 元素数量
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        // 链表头
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        // 链表尾
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        // take 锁
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */、
        // notEmpty条件
        // 当队列没有元素时,take锁会阻塞在notEmpty条件上,等待其他线程唤醒
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        // put 锁
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        // notFull条件
        // 当队列满了,put锁会阻塞在notFull上,等待其他线程唤醒
        private final Condition notFull = putLock.newCondition();
    

    (1)capacity,有容量,可以理解为LinkedBlockingQueue是有界队列
    (2)head, last,链表头、链表尾指针
    (3)takeLock,notEmpty,take锁及其对应的条件
    (4)putLock, notFull,put锁及其对应的条件
    (5)入队、出队使用两个不同的锁控制,锁分离,提高效率

    内部类

        static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }
    

    只有next指针,是单链表结构。

    2.2 构造方法

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            // 初始化head和last指针为空值节点
            last = head = new Node<E>(null);
        }
    

    如果没有传容量,就会将容量初始化为最大int值

    2.3 入队之put方法

        public void put(E e) throws InterruptedException {
            // 不允许null元素
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            // 新建一个节点
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            // 使用put锁加锁
            putLock.lockInterruptibly();
            try {
                // 如果队列满了,就阻塞在notFull条件上
                // 等待被其他线程唤醒
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 队列不满,入队
                enqueue(node);
                // 队列长度加1
                c = count.getAndIncrement();
                // 如果现队列长度小于容量,就再唤醒一个阻塞在notFull条件上的线程
                // 这里唤醒是因为可能有很多线程阻塞在notFull上,直接在这里signal()不会重复加锁
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    这里只有在put之前,队列里面没有元素,才会调用signalNotEmpty方法

    enqueue方法
    直接加到 last 后面

        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    

    2.4 出队之take方法

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            // 使用takeLock加锁
            takeLock.lockInterruptibly();
            try {
                // 如果队列无元素,则阻塞在notEmpty条件上
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 否则出队
                x = dequeue();
                // 获取出队前队列的长度
                c = count.getAndDecrement();
                // 如果取之前队列长度大于1,则唤醒notEmpty
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            // 如果取之前队列长度等于容量
            // 则唤醒notFull
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    如下图所示,在take()方法中,唤醒notFull线程的条件很苛刻,就是在取元素之前,队列中元素个数是与容量capacity一致,其余的情况就不会再去signalNotFull
    那么为什么不去调用呢?因为使用signalNotFull方法需要获取putLock,减少锁的次数。

    3 总结

    (1)LinkedBlockingQueue采用单链表的形式实现;
    (2)LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
    (3)LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;

    面试问题
    1.LinkedBlockingQueue与ArrayBlockingQueue对比?
    a)后者入队出队采用一把锁,导致入队出队相互阻塞,效率低下;
    b)前者入队出队采用两把锁,入队出队互不干扰,效率较高;
    c)二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
    d)前者如果初始化不传入初始容量,则使用最大int值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;

    相关文章

      网友评论

          本文标题:[并发集合] LinkedBlockingQueue源码分析

          本文链接:https://www.haomeiwen.com/subject/ewozoctx.html