美文网首页并发编程
J.U.C之阻塞队列(二)

J.U.C之阻塞队列(二)

作者: 今年五年级 | 来源:发表于2019-10-25 09:55 被阅读0次

    LinkedBlockingQueue

    与ArrayBlockingQueue区别

    linkedBlockingQueue与ArrayBlockingQueue主要区别为

    1. ArrayBlockingQueue是数组结构的队列,是有界队列
    2. LinkedBlockingQueue是链表结构的队列,可以是有界队列也可以是无界队列(capacity=Integer.MAX_VALUE)

    主要属性

        //队列容量
        private final int capacity;
        //队列元素个数
        private final AtomicInteger count = new AtomicInteger();
        //对头
        transient Node<E> head;
        //队尾
        private transient Node<E> last;
        //take,poll,peek等读操作的方法需要获取读锁
        private final ReentrantLock takeLock = new ReentrantLock();
        //如果读操作的时候队列为空,则等待不空条件
        private final Condition notEmpty = takeLock.newCondition();
        //put,offer等写操作的方法需要获取写锁
        private final ReentrantLock putLock = new ReentrantLock();
        //如果写操作的时候队列为满,则等待不满条件
        private final Condition notFull = putLock.newCondition();
    

    锁和条件的搭配关系:如要读取,则不仅要获取读锁,而且还要满足队列不为空的条件


    未命名文件 (11).png

    从上图可以看出来,读操作和写操作单独看都是安全的,并发问题在于一个写操作和一个读操作同时进行

    源码分析

    构造方法

        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    

    初始化的时候会构造一个空节点,第一个元素入队的时候,队列中会有两个元素,读取元素时,也总是会获取头节点后面的一个节点,count的计数值不包括这个头节点

    put(E e)

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            //将待插入的元素构造为一个Node节点
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            //获取队列中元素个数
            final AtomicInteger count = this.count;
            //用写锁加一个可以被中断抛出异常的锁
            putLock.lockInterruptibly();
            try {
                //当队列中容量满则等待不满的条件
                while (count.get() == capacity) {
                    //释放锁+park+被唤醒(signal or interrupt)
                    notFull.await();
                }
                //此时被通知不满开始入队
                enqueue(node);
                //先返回队列中元素的数量给c再给队列元素数量+1
                c = count.getAndIncrement();
                //如果这个元素入队后还有位置可以插入
                if (c + 1 < capacity)
                    //唤醒等待在不满的条件的线程
                    notFull.signal();
            } finally {
                //入队后,释放掉写锁
                putLock.unlock();
            }
            //c==0,则代表队列这个元素入队列之前是空的(不包括head节点)
            if (c == 0)
                //这里做一次唤醒操作让其他线程读取新加入的元素
                signalNotEmpty();
        }
    
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    

    signalNotEmpty()

    private void signalNotEmpty() {
            //获取读锁
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                //唤醒线程来读取
                notEmpty.signal();
            } finally {
                //释放读锁
                takeLock.unlock();
            }
        }
    

    take()

     public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            //获取到读锁才能进行出队操作
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                //出队
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    //出队以后队列还不为空,唤醒等待在不为空队列的线程
                    notEmpty.signal();
            } finally {
                //出队释放掉读锁
                takeLock.unlock();
            }
            //读取发生之前满了
            if (c == capacity)
                //刚刚进行了一次出队操作,已经有位置了,唤醒写线程往里面写入
                signalNotFull();
            return x;
        }
    
        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            //设置这个节点Wie新的头节点
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    

    signalNotFull()

    类比上面的signalNotEmpty

        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    

    PriorityBlockingQueue

    带优先级(排序)的blockingQueue实现

    相关文章

      网友评论

        本文标题:J.U.C之阻塞队列(二)

        本文链接:https://www.haomeiwen.com/subject/uuqdvctx.html