美文网首页IT技术
10. LinkedBlockingQueue

10. LinkedBlockingQueue

作者: shallowinggg | 来源:发表于2019-03-20 17:14 被阅读0次

    LinkedBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

    LinkedBlockingQueue在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE作为上限。

    LinkedBlockingQueue内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。

    以下是如何实例化和使用LinkedBlockingQueue

    BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
    BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);
    
    bounded.put("Value");
    
    String value = bounded.take();
    

    源码

    LinkedBlockingQueue内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count字段使用原子变量,避免修改它时需要同时获取两个锁。

    static class Node<E> {
        E item;
    
        /**
         * 下面中的一个:
         * - 真实的后继节点
         * - 这个节点本身,此时原后继节点现在是head.next,即第一个元素
         * - null, 意味没有后继节点,此节点是队列最后一个节点
         */
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    
    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;
    
    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;
    
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    

    增加操作

    注意进行增加操作时,只对putLock加锁,如果还对takeLock也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count使用原子变量,进行CAS更新,防止数据不一致。

    为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 注意即使count没有被锁保护,它依然可以被用作等待条件
            // 判定。因为此时count只会被减少(putLock已加锁),如果容量
            // 改变,会被唤醒。count在其他地方的使用也与此相似。
    
            // 队列已满,阻塞自己
            while (count.get() == capacity) {
                notFull.await();
            }
            // 插入队列中
            enqueue(node);
            // CAS更新count值
            c = count.getAndIncrement();
            // 如果队列没满,唤醒其他等待插入的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 如果队列原来是空队列,唤醒等待提取元素的线程
        if (c == 0)
            signalNotEmpty();
    }
    
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        // 先加锁,才能调用对应Condtion的signal()方法
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 队列已满,返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 等待-超时机制
            while (count.get() == capacity) {
                if (nanos <= 0L)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    

    删除操作

    删除操作与增加操作一样。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 当队列为空,阻塞自己
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 将头节点出队
            x = dequeue();
            c = count.getAndDecrement();
            // 如果队列还有元素,唤醒其他等待提取元素的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 如果原本队列是满的,唤醒增加线程,因为现在元素已经被取出,队列不满
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
    
        // 头节点为空,其中不存储元素
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0L)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    // 删除一个指定元素
    public boolean remove(Object o) {
        if (o == null) return false;
        // 将两个锁全部加锁
        fullyLock();
        try {
            for (Node<E> pred = head, p = pred.next;
                 p != null;
                 pred = p, p = p.next) {
                if (o.equals(p.item)) {
                    // 从队列中移除此节点
                    unlink(p, pred);
                    return true;
                }
            }
            return false;
        } finally {
            // 释放全部两个锁
            fullyUnlock();
        }
    }
    
    void unlink(Node<E> p, Node<E> pred) {
        // assert putLock.isHeldByCurrentThread();
        // assert takeLock.isHeldByCurrentThread();
        // p.next没有被设置为null,为了保证迭代器遍历到p时继续工作,
        // 保证弱一致性
        p.item = null;
        pred.next = p.next;
        if (last == p)
            last = pred;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
    

    访问操作

    public E peek() {
        // 队列为空,返回null
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 返回第一个元素
            return (count.get() > 0) ? head.next.item : null;
        } finally {
            takeLock.unlock();
        }
    }
    
    

    其他操作

    public void clear() {
        fullyLock();
        try {
            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
                // 使得next指向自己
                h.next = h;
                // 解除对元素实体的引用
                p.item = null;
            }
            head = last;
            // assert head.item == null && head.next == null;
            // 如果原来队列是满的,唤醒等待的插入线程
            if (count.getAndSet(0) == capacity)
                notFull.signal();
        } finally {
            fullyUnlock();
        }
    }
    
    
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
    
    public int drainTo(Collection<? super E> c, int maxElements) {
        Objects.requireNonNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 获取当前队列中的元素数量
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                // 将n个元素加入到指定集合中
                while (i < n) {
                    Node<E> p = h.next;
                    c.add(p.item);
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;
                    signalNotFull = (count.getAndAdd(-i) == capacity);
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)
                signalNotFull();
        }
    }
    
    

    迭代器

    LinkedBlockingQueue的迭代器与DelayQueue的不同,DelayQueue的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue的迭代器与内部的链表保持了弱一致性。

    注意它的next()方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。

    而它的forEachRemaining(Consumer<? super E> action)方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。

    private class Itr implements Iterator<E> {
        private Node<E> next;           // 持有nextItem的节点
        private E nextItem;             // 下一个进行处理的元素
        private Node<E> lastRet;        // 上一个返回的元素,即当前正在使用的
        private Node<E> ancestor;       // Helps unlink lastRet on remove()
    
        Itr() {
            fullyLock();
            try {
                // 保存第一个元素
                if ((next = head.next) != null)
                    nextItem = next.item;
            } finally {
                fullyUnlock();
            }
        }
    
        public boolean hasNext() {
            return next != null;
        }
    
        public E next() {
            Node<E> p;
            if ((p = next) == null)
                throw new NoSuchElementException();
            lastRet = p;
            E x = nextItem;
            fullyLock();
            try {
                E e = null;
                // 注意此处,遇到空节点会跳过去访问下一个节点
                for (p = p.next; p != null && (e = p.item) == null; )
                    p = succ(p);
                next = p;
                nextItem = e;
            } finally {
                fullyUnlock();
            }
            return x;
        }
        
        Node<E> succ(Node<E> p) {
            // 正常出队的元素next字段会指向自己
            if (p == (p = p.next))
                p = head.next;
            return p;
        }
        
        public void forEachRemaining(Consumer<? super E> action) {
            // A variant of forEachFrom
            Objects.requireNonNull(action);
            Node<E> p;
            if ((p = next) == null) return;
            lastRet = p;
            next = null;
            final int batchSize = 64;
            Object[] es = null;
            int n, len = 1;
            do {
                fullyLock();
                try {
                    if (es == null) {
                        p = p.next;
                        // 获取真正存在的元素的数量,如果多于64,分批进行,一批为64个
                        for (Node<E> q = p; q != null; q = succ(q))
                            if (q.item != null && ++len == batchSize)
                                break;
                        es = new Object[len];
                        es[0] = nextItem;
                        nextItem = null;
                        n = 1;
                    } else
                        n = 0;
                    // n为1的使用只因为p=p.next,经过此步后p已经不是首元素,
                    // 而是第二个元素。而后面批次的插入直接从0开始即可
                    // 将元素放入数组中
                    for (; p != null && n < len; p = succ(p))
                        if ((es[n] = p.item) != null) {
                            lastRet = p;
                            n++;
                        }
                } finally {
                    fullyUnlock();
                }
                // 分别调用accept方法
                for (int i = 0; i < n; i++) {
                    @SuppressWarnings("unchecked") E e = (E) es[i];
                    action.accept(e);
                }
            } while (n > 0 && p != null);
        }
    
        public void remove() {
            // 获取当前元素
            Node<E> p = lastRet;
            if (p == null)
                throw new IllegalStateException();
            lastRet = null;
            fullyLock();
            try {
                if (p.item != null) {
                    if (ancestor == null)
                        ancestor = head;
                    // 获取p的前驱结点
                    ancestor = findPred(p, ancestor);
                    // 从链表中删除结点p
                    unlink(p, ancestor);
                }
            } finally {
                fullyUnlock();
            }
        }
    }
    
    

    测试:

    import org.junit.Test;
    
    import java.util.Iterator;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class LinkedBlockingQueueTest {
        private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
    
        @Test
        public void test() {
            queue.offer("1");
            queue.offer("2");
            queue.offer("3");
            queue.offer("4");
    
            Iterator<String> itr = queue.iterator();
            queue.remove("3");
            itr.forEachRemaining(System.out::println);
        }
    }
    

    输出如下:

    1
    2
    4

    核心要点

    1. 内部使用一个单向链表,以FIFO顺序存储
    2. 可以在链表两头同时进行操作,所以使用两个锁分别保护
    3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。
    4. 迭代器与单向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
    5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作

    相关文章

      网友评论

        本文标题:10. LinkedBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/hedimqtx.html