美文网首页
Java并发-ArrayBlockingQueue源码分析

Java并发-ArrayBlockingQueue源码分析

作者: 宛丘之上兮 | 来源:发表于2018-12-10 18:54 被阅读17次
    先看下ArrayBlockingQueue类图

    ArrayBlockingQueue使用场景

    我们先写个程序看下如何使用ArrayBlockingQueue:

    public class CB {
        public static void main(String[] args) {
            int capacity = 10;
            ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);
    
            for (int i = 0; i < 20; i++) {
                new Thread(new Producer(queue)).start();
            }
            new Thread(new Consumer(queue)).start();
        }
    
        static class Producer implements Runnable {
    
            //容器
            private final ArrayBlockingQueue<Bread> queue;
    
            public Producer(ArrayBlockingQueue<Bread> queue) {
                this.queue = queue;
            }
    
            /* (non-Javadoc)
             * @see java.lang.Runnable#run()
             */
            @Override
            public void run() {
                while (true) {
                    produce();
                }
            }
    
            public void produce() {
                /**
                 * put()方法是如果容器满了的话就会把当前线程挂起
                 * offer()方法是容器如果满的话就会返回false,也正是我在前一篇中实现的那种策略。
                 */
                try {
                    Bread bread = new Bread();
                    Thread.sleep(2000);
                    queue.put(bread);
                    System.out.println("Producer:" + bread);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Consumer implements Runnable {
    
            //容器
            private final ArrayBlockingQueue<Bread> queue;
    
            public Consumer(ArrayBlockingQueue<Bread> queue) {
                this.queue = queue;
            }
    
            /* (non-Javadoc)
             * @see java.lang.Runnable#run()
             */
            @Override
            public void run() {
                while (true) {
                    consume();
                }
            }
    
            public void consume() {
                /**
                 * take()方法和put()方法是对应的,从中拿一个数据,如果拿不到线程挂起
                 * poll()方法和offer()方法是对应的,从中拿一个数据,如果没有直接返回null
                 */
                try {
                    Thread.sleep(2000);
                    Bread bread = queue.take();
                    System.out.println("consumer:" + bread);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Bread {
            long time;
            Bread() {
                this.time = System.currentTimeMillis() / 1000;
            }
            @Override
            public String toString() {
                return String.valueOf(time);
            }
        }
    }
    

    这是一个生产者消费者模型的例子,代码中定义了20个生产者线程,1个消费者线程,产品池中最多容纳10个面包,输出如下:

    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    Producer:1544429143
    consumer:1544429143
    Producer:1544429143
    Producer:1544429143
    consumer:1544429143
    Producer:1544429145
    

    可以看到,生产和消费能够正确并发执行。

    ArrayBlockingQueue构造器和成员变量

    提供了三个构造器:

        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    

    可以看出,这个类本质是一个由数组支持的有界阻塞队列,其加锁机制还是依赖ReentrantLock。
    类有以下成员变量:

    //真正保存数据的数组
    final Object[] items;
    ///** take, poll, peek or remove的下一个索引 */  
    int takeIndex;
    ///** put, offer, or add的下一个索引 */  
    int putIndex;
    //元素个数
    int count;
    //可重入锁
    final ReentrantLock lock;
    //队列不为空的条件
    private final Condition notEmpty;
    //队列未满的条件
    private final Condition notFull;
    //遍历使用的迭代器
    transient Itrs itrs = null;
    

    ArrayBlockingQueue核心方法

    有以下核心方法:

    • 放入数据
      1. add(object)队列没满的话,放入成功。否则抛出异常。
      2. offer(object):表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
      3. offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
      4. put(object)把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.
    • 获取数据
      1. poll()取走BlockingQueue里排在首位的对象,
      2. poll(long timeout, TimeUnit unit)从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      3. take()取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
      4. drainTo()一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    存放数据方法

    看下add(object)源码:

        public boolean add(E e) {
            return super.add(e);
        }
    

    super.add(e)源码在父类AbstractQueue中:

        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    

    所以看下offer的实现:

        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    

    还有一个具有超时功能的offer(Object, timeout, timeUnit)方法:

        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    

    上面的三个方法都是非阻塞的,下面的put方法是阻塞的:

        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    可以看到,数据满的话会进入notFull.await()状态等待唤醒,然后执行enqueue(E x)

        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
    

    enqueue方法会给一些变量赋值,比如非空信号量notEmpty、元素个数count、下一个加入的索引putIndex。

    获取数据方法

    poll()不会抛出异常,当元素个数为0是,返回null,否则,调用dequeue函数,并唤醒等待notFull条件的线程并返回,与offer相对应:

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    

    take函数用于从ArrayBlockingQueue中获取一个元素,在当前线程被中断时会抛出异常,并且当队列为空时,会阻塞一直等待,其与put函数相对应。

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    ArrayBlockingQueue遍历

    ArrayBlockingQueue提供了两个内部类辅助遍历:ItrItrs
    Itrs源码:

        class Itrs {
            private class Node extends WeakReference<Itr> {
                Node next;
    
                Node(Itr iterator, Node next) {
                    super(iterator);
                    this.next = next;
                }
            }
    
            /** Incremented whenever takeIndex wraps around to 0 */
            int cycles = 0;
            /** Linked list of weak iterator references */
            private Node head;
            /** Used to expunge stale iterators */
            private Node sweeper = null;
            private static final int SHORT_SWEEP_PROBES = 4;
            private static final int LONG_SWEEP_PROBES = 16;
    
            Itrs(Itr initial) {
                register(initial);
            }
    
            void doSomeSweeping(boolean tryHarder) {
                // assert lock.getHoldCount() == 1;
                // assert head != null;
                int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
                Node o, p;
                final Node sweeper = this.sweeper;
                boolean passedGo;   // to limit search to one full sweep
    
                if (sweeper == null) {
                    o = null;
                    p = head;
                    passedGo = true;
                } else {
                    o = sweeper;
                    p = o.next;
                    passedGo = false;
                }
    
                for (; probes > 0; probes--) {
                    if (p == null) {
                        if (passedGo)
                            break;
                        o = null;
                        p = head;
                        passedGo = true;
                    }
                    final Itr it = p.get();
                    final Node next = p.next;
                    if (it == null || it.isDetached()) {
                        // found a discarded/exhausted iterator
                        probes = LONG_SWEEP_PROBES; // "try harder"
                        // unlink p
                        p.clear();
                        p.next = null;
                        if (o == null) {
                            head = next;
                            if (next == null) {
                                // We've run out of iterators to track; retire
                                itrs = null;
                                return;
                            }
                        }
                        else
                            o.next = next;
                    } else {
                        o = p;
                    }
                    p = next;
                }
    
                this.sweeper = (p == null) ? null : o;
            }
    
            void register(Itr itr) {
                // assert lock.getHoldCount() == 1;
                head = new Node(itr, head);
            }
    
            void takeIndexWrapped() {
                // assert lock.getHoldCount() == 1;
                cycles++;
                for (Node o = null, p = head; p != null;) {
                    final Itr it = p.get();
                    final Node next = p.next;
                    if (it == null || it.takeIndexWrapped()) {
                        // unlink p
                        // assert it == null || it.isDetached();
                        p.clear();
                        p.next = null;
                        if (o == null)
                            head = next;
                        else
                            o.next = next;
                    } else {
                        o = p;
                    }
                    p = next;
                }
                if (head == null)   // no more iterators to track
                    itrs = null;
            }
    
            void removedAt(int removedIndex) {
                for (Node o = null, p = head; p != null;) {
                    final Itr it = p.get();
                    final Node next = p.next;
                    if (it == null || it.removedAt(removedIndex)) {
                        // unlink p
                        // assert it == null || it.isDetached();
                        p.clear();
                        p.next = null;
                        if (o == null)
                            head = next;
                        else
                            o.next = next;
                    } else {
                        o = p;
                    }
                    p = next;
                }
                if (head == null)   // no more iterators to track
                    itrs = null;
            }
    
            void queueIsEmpty() {
                // assert lock.getHoldCount() == 1;
                for (Node p = head; p != null; p = p.next) {
                    Itr it = p.get();
                    if (it != null) {
                        p.clear();
                        it.shutdown();
                    }
                }
                head = null;
                itrs = null;
            }
    
            void elementDequeued() {
                // assert lock.getHoldCount() == 1;
                if (count == 0)
                    queueIsEmpty();
                else if (takeIndex == 0)
                    takeIndexWrapped();
            }
        }
    

    Itr源码:

        private class Itr implements Iterator<E> {
            /** Index to look for new nextItem; NONE at end */
            private int cursor;
            /** Element to be returned by next call to next(); null if none */
            private E nextItem;
            /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
            private int nextIndex;
            /** Last element returned; null if none or not detached. */
            private E lastItem;
            /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
            private int lastRet;
            /** Previous value of takeIndex, or DETACHED when detached */
            private int prevTakeIndex;
            /** Previous value of iters.cycles */
            private int prevCycles;
            /** Special index value indicating "not available" or "undefined" */
            private static final int NONE = -1;
            private static final int REMOVED = -2;
            /** Special value for prevTakeIndex indicating "detached mode" */
            private static final int DETACHED = -3;
    
            Itr() {
                // assert lock.getHoldCount() == 0;
                lastRet = NONE;
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    if (count == 0) {
                        // assert itrs == null;
                        cursor = NONE;
                        nextIndex = NONE;
                        prevTakeIndex = DETACHED;
                    } else {
                        final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                        prevTakeIndex = takeIndex;
                        nextItem = itemAt(nextIndex = takeIndex);
                        cursor = incCursor(takeIndex);
                        if (itrs == null) {
                            itrs = new Itrs(this);
                        } else {
                            itrs.register(this); // in this order
                            itrs.doSomeSweeping(false);
                        }
                        prevCycles = itrs.cycles;
                        // assert takeIndex >= 0;
                        // assert prevTakeIndex == takeIndex;
                        // assert nextIndex >= 0;
                        // assert nextItem != null;
                    }
                } finally {
                    lock.unlock();
                }
            }
    
            boolean isDetached() {
                // assert lock.getHoldCount() == 1;
                return prevTakeIndex < 0;
            }
    
            private int incCursor(int index) {
                // assert lock.getHoldCount() == 1;
                if (++index == items.length)
                    index = 0;
                if (index == putIndex)
                    index = NONE;
                return index;
            }
    
            private boolean invalidated(int index, int prevTakeIndex,
                                        long dequeues, int length) {
                if (index < 0)
                    return false;
                int distance = index - prevTakeIndex;
                if (distance < 0)
                    distance += length;
                return dequeues > distance;
            }
    
            private void incorporateDequeues() {
    
                final int cycles = itrs.cycles;
                final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                final int prevCycles = this.prevCycles;
                final int prevTakeIndex = this.prevTakeIndex;
    
                if (cycles != prevCycles || takeIndex != prevTakeIndex) {
                    final int len = items.length;
                    // how far takeIndex has advanced since the previous
                    // operation of this iterator
                    long dequeues = (cycles - prevCycles) * len
                        + (takeIndex - prevTakeIndex);
    
                    // Check indices for invalidation
                    if (invalidated(lastRet, prevTakeIndex, dequeues, len))
                        lastRet = REMOVED;
                    if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
                        nextIndex = REMOVED;
                    if (invalidated(cursor, prevTakeIndex, dequeues, len))
                        cursor = takeIndex;
    
                    if (cursor < 0 && nextIndex < 0 && lastRet < 0)
                        detach();
                    else {
                        this.prevCycles = cycles;
                        this.prevTakeIndex = takeIndex;
                    }
                }
            }
    
            private void detach() {
                if (prevTakeIndex >= 0) {
                    // assert itrs != null;
                    prevTakeIndex = DETACHED;
                    // try to unlink from itrs (but not too hard)
                    itrs.doSomeSweeping(true);
                }
            }
    
            public boolean hasNext() {
                // assert lock.getHoldCount() == 0;
                if (nextItem != null)
                    return true;
                noNext();
                return false;
            }
    
            private void noNext() {
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    // assert cursor == NONE;
                    // assert nextIndex == NONE;
                    if (!isDetached()) {
                        // assert lastRet >= 0;
                        incorporateDequeues(); // might update lastRet
                        if (lastRet >= 0) {
                            lastItem = itemAt(lastRet);
                            // assert lastItem != null;
                            detach();
                        }
                    }
                    // assert isDetached();
                    // assert lastRet < 0 ^ lastItem != null;
                } finally {
                    lock.unlock();
                }
            }
    
            public E next() {
                // assert lock.getHoldCount() == 0;
                final E x = nextItem;
                if (x == null)
                    throw new NoSuchElementException();
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    if (!isDetached())
                        incorporateDequeues();
                    // assert nextIndex != NONE;
                    // assert lastItem == null;
                    lastRet = nextIndex;
                    final int cursor = this.cursor;
                    if (cursor >= 0) {
                        nextItem = itemAt(nextIndex = cursor);
                        // assert nextItem != null;
                        this.cursor = incCursor(cursor);
                    } else {
                        nextIndex = NONE;
                        nextItem = null;
                    }
                } finally {
                    lock.unlock();
                }
                return x;
            }
    
            void shutdown() {
                // assert lock.getHoldCount() == 1;
                cursor = NONE;
                if (nextIndex >= 0)
                    nextIndex = REMOVED;
                if (lastRet >= 0) {
                    lastRet = REMOVED;
                    lastItem = null;
                }
                prevTakeIndex = DETACHED;
                // Don't set nextItem to null because we must continue to be
                // able to return it on next().
                //
                // Caller will unlink from itrs when convenient.
            }
    
            private int distance(int index, int prevTakeIndex, int length) {
                int distance = index - prevTakeIndex;
                if (distance < 0)
                    distance += length;
                return distance;
            }
    
            boolean takeIndexWrapped() {
                // assert lock.getHoldCount() == 1;
                if (isDetached())
                    return true;
                if (itrs.cycles - prevCycles > 1) {
                    // All the elements that existed at the time of the last
                    // operation are gone, so abandon further iteration.
                    shutdown();
                    return true;
                }
                return false;
            }
        }
    

    相关文章

      网友评论

          本文标题:Java并发-ArrayBlockingQueue源码分析

          本文链接:https://www.haomeiwen.com/subject/ydarhqtx.html