美文网首页
LinkedBlockQueue和LinkedBlockDequ

LinkedBlockQueue和LinkedBlockDequ

作者: 无聊之园 | 来源:发表于2019-01-11 15:08 被阅读0次

    动机起源:来自于有人问这两个堵塞队列哪个效率高一点。
    先说答案:LinkedBlockQueue高一点。而且,这两个东西有很大的不同

    一、先看LinkedBlockQueue
    LInkedBlockQueue有两把锁,take锁和put锁,所以take和put可以并发。
    因为:linkedBlockQueque维护的是单链表,添加的时候,只添加到尾部,获取的时候从头部获取,所以take和put的入队和出队操作不会有并发错误:
    入队的时候,分两步:1、last.next = node,这一步对出队操作没有任何影响。
    2、last = last.next,这个操作,对出队也没有影响。
    所以入队只操作tail和tail的next,出队的时候只操作head,而且不是移除head的下一个元素,而是移除head节点,然后把head的下一个元素变成head的节点,所以入队出队互相没有并发影响。
    这里假设,如果是双向链表,则肯定会有并发操作,因为,会操作两边的链表,入队和出队有交叉,所以有并发操作。
    如果是单向链表,移除的是head的下一个元素也会存在并发问题。

    private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    

    研究其put和take方法的源代码,代码里附有注释。

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            // put 锁:private final ReentrantLock putLock = new ReentrantLock();
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            // 自旋请求lock,lock和lockInterruptibly的取别是,lockInterruptibly可线程中断,如果线程Interruptibly,则不再block等待
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                 // 容量已经满了
            // 这里为什么要while?因为被唤醒,不一定就立即获得锁
            // 如果被唤醒没有获得锁,被别的put线程获得锁了,然后
            // 马上添加了一个元素,则被唤醒的线程,获得锁之后,则
            // 容量已经满了,所以需要while继续等待
            // 所以对于多线程wait操作,一定不要忘了wait操作用while循环做
                 // private final Condition notFull = putLock.newCondition(); condition用于并发锁实现await,sign的功能
                while (count.get() == capacity) {
                    notFull.await();
                }
    // 入队
                enqueue(node);
                // 获取当前大小,并且当前大小加1,注意这个c是入队前的大小
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    // 还有空余位置, 则唤醒其他线程,注意,put操作会唤醒put操作,而不是我们以为的take操作唤醒put操作,当然,如果put满了,则是take操作唤醒put操作
                    notFull.signal();
            } finally {
                // 释放锁
                putLock.unlock();
            }
            // 之前队列大小为0,现在添加了数据,有数据,则唤醒take锁
            if (c == 0)
                signalNotEmpty();
        }
        
    
    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
    // take使用的是take锁,也就是take和put是可以并发的
            final ReentrantLock takeLock = this.takeLock;
            // 自旋,获取take锁
            takeLock.lockInterruptibly();
            try {
                // 没有数据,则等待,如put操作
                while (count.get() == 0) {
                    notEmpty.await();
                }
    // 出队
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // 还有数据,则唤醒其他线程take数据
                    notEmpty.signal();
            } finally {
                // 释放锁
                takeLock.unlock();
            }
            // 之前数据已经饱和,现在取出了一个数据,所以唤醒put锁
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    移除指定位置的元素,自然元素前后指针都会受到影响,前指针跟get有关,后指针跟put有关,所以,锁两把锁

    // 移除操作,为什么是锁两把锁,因为移除元素操作对put和take操作都有影响。
         public boolean remove(Object o) {
            if (o == null) return false;
            fullyLock();
            try {
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                    if (o.equals(p.item)) {
                        unlink(p, trail);
                        return true;
                    }
                }
                return false;
            } finally {
                fullyUnlock();
            }
        }
    

    总结:put操作的时候,如果还有线程堵塞,并且还有空余位置,则put操作会唤醒put操作,如此递归,直到所有put操作都执行了。
    take操作也是如此,如果还有现车给堵塞take操作,并且还有值,则take操作会唤醒take操作,如此递归,直到所有take操作都执行完了。
    当然,当put操作满了之后,put操作也会唤醒take操作,。
    同理,当take操作空了之后,也会唤醒put操作。

    二、再来看linkedBlockDequeue
    同样分析其放和取的方法

    public void push(E e) {
            addFirst(e);
        }
           public void addFirst(E e) {
               // 这里可以可能到,如果offerFisrt方法调用返回false,则直接报deque 已经满了的错误,而offerFirst方法是没有await操作的。
            if (!offerFirst(e))
                throw new IllegalStateException("Deque full");
        }
        
        public boolean offerFirst(E e) {
            if (e == null) throw new NullPointerException();
            Node<E> node = new Node<E>(e);
            // final ReentrantLock lock = new ReentrantLock();
            // 和linkedBlockQueue不同的,linkedBlockQueue有两把锁,putlock,tacklock,linkedBlockDeque只有一把锁
            // 
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return linkFirst(node);
            } finally {
                lock.unlock();
            }
        }
        
        private boolean linkFirst(Node<E> node) {
            // assert lock.isHeldByCurrentThread();
            // 如果已经满了,直接返回false,不wait,可见这个东西并不堵塞,满了直接报错。
            if (count >= capacity)
                return false;
            Node<E> f = first;
            node.next = f;
            first = node;
            if (last == null)
                last = node;
            else
                f.prev = node;
            ++count;
            notEmpty.signal();
            return true;
        }
    

    其他的方法比如add() addLast 等方法,都是类似,不堵塞,直接报错。

    // 可以看到,getfirst方法,同样,不堵塞,直接报错
           public E getFirst() {
            E x = peekFirst();
            if (x == null) throw new NoSuchElementException();
            return x;
        }
        public E peekFirst() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (first == null) ? null : first.item;
            } finally {
                lock.unlock();
            }
        }
    
    // 顺便看看clear方法,锁住之后,一个一个元素,前指针和后指针置为空,其实双向链表结构,头和尾指向空,就能达到同样的效果,而他这样做,只是为了便于gc快速回收
        public void clear() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (Node<E> f = first; f != null; ) {
                    f.item = null;
                    Node<E> n = f.next;
                    f.prev = null;
                    f.next = null;
                    f = n;
                }
                first = last = null;
                count = 0;
                notFull.signalAll();
            } finally {
                lock.unlock();
            }
        }
    

    三、总结、linkedBlcokQueue,是一个堵塞队列,fifo,有两把锁,takelock, putlock。有两把锁,意味着,其take和put操作,当每到临界值的时候,是并行的,思想上因为:不到临界值,put和take其实是互不相干的。
    而linkedBlockDeque,一把锁,则是串行的。故,单纯论效率,linkedBlcokQueue比linkedBlockDeque高。
    linekdBlockDeQue,不是一个堵塞队列,但是也是一个线程安全的队列,队列满了,put直接报错,队列空了,get也直接报错。

    相关文章

      网友评论

          本文标题:LinkedBlockQueue和LinkedBlockDequ

          本文链接:https://www.haomeiwen.com/subject/jqyxdqtx.html