美文网首页
多线程之阻塞队列

多线程之阻塞队列

作者: OPice | 来源:发表于2019-09-23 13:06 被阅读0次

    阻塞队列

    BlockingQueue

    队列主要有两种:FIFO(先进先出)、LIFO(后进先出)。
    再多线程环境中,队列很容实现数据共享,我们常用的"生产者"、"消费者"模型就可以通过队列来传递数据达到数据共享。但是现实中,大多数情况都是生产者产生消息的速度和消费的速度是不匹配的,就需要相应的对生产或者消费进行阻塞。当生产的消息积累到一定程度时,就需要对生产者就行阻塞,以便消费者将积累的消息进行消费。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全。BlockingQueue释放了我们的双手,他让我们不用关系什么时候去阻塞,什么时候去唤醒线程。

    抛异常 返回false 阻塞 超时,抛异常
    插入 add offer put offer(timeout)
    移除 take remove poll(timeout)
    检查 contains

    操作方法:

    //--------添加 ----------
    boolean add(E e);        //添加元素,加不了抛异常
    boolean offer(E e);      //添加元素,加不了返回false
    void put(E e) throws InterruptedException;  //添加元素,加不了一直阻塞
    boolean offer(E e, long timeout, TimeUnit unit)
           throws InterruptedException; //添加元素,达到指定时间没有加入抛异常
    // -------移除-----------
    boolean remove(Object o);
    E poll(long timeout, TimeUnit unit)
           throws InterruptedException;
    E take() throws InterruptedException;
    // ------
    

    常见的BlockingQueue

    有界性 数据结构
    ArrayBlockingQueue bounded 加锁 ArrayList
    LinkedBlockingQueue optionally-bounded 加锁 LinkedList
    DelayQueue unbounded 加锁 heap
    PriorityBlockingQueue unbounded 加锁 heap
    SynchronousQueue bounded 加锁

    1. ArrayBlockingQueue

    基于数组实现的有界阻塞安全线程队列。
    构造函数

    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //初始化重入锁
            lock = new ReentrantLock(fair);
            //初始化读、写等待队列
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
           //初始化构造器
            this(capacity, fair);
            //获取重入锁
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
               //初始化元素中的个数
                count = i;
               //插入元素的下标索引
                putIndex = (i == capacity) ? 0 : i;
            } finally {
               //释放锁
                lock.unlock();
            }
        }
    

    相关属性

    final Object[] items;        //存放元素的数组
    int takeIndex;                 //取元素的下标索引
    int putIndex;                  //存元素的下标索引
    int count;                      //数组中的元素个数
    final ReentrantLock lock;   //数据读取的可重入锁
    private final Condition notEmpty; //读等待的队列
    private final Condition notFull;    //写等待的队列
    

    核心函数
    put

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
           //如果当前线程没有中断,就将当前线程锁定
            lock.lockInterruptibly();
            try {
                //当前队列已经满了就一直等待
                while (count == items.length)
                    notFull.await();
                //插入元素
                enqueue(e);
            } finally {
               //释放锁
                lock.unlock();
            }
        }
    

    take

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    
    2. LinkedBlockingQueue

    基于链表实现的阻塞队列
    构造函数

    public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    

    相关属性

    private final int capacity;           //元素个数
    private final AtomicInteger count = new AtomicInteger();  //
    transient Node<E> head;             //头节点
    private transient Node<E> last;     //尾节点
    private final ReentrantLock takeLock = new ReentrantLock();  //读可重入锁
    private final Condition notEmpty = takeLock.newCondition();  //读等待队列
    private final ReentrantLock putLock = new ReentrantLock(); //写可重入锁
    private final Condition notFull = putLock.newCondition();  //写队列
    

    核心函数
    put

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
           //获取写的可重入锁
            final ReentrantLock putLock = this.putLock;
            //线程安全的原子操作类
            final AtomicInteger count = this.count;
            //判断线程是否中断
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    //如果加1后还小于当前容量,则唤醒一个等待的线程
                    notFull.signal();
            } finally {
                //释放锁
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    
    3. DelayQueue

    DelayQueue每次都是将元素加入排序队列,以delay/过期时间为排序因素,将快过期的元素放在队首,取数据的时候每次都是先取快过期的元素。
    构造方法

    public DelayQueue() {}
    public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
    

    相关属性

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();  //根据队列里某些元素排序的有序队列
    private final Condition available = lock.newCondition();
    private Thread leader = null;
    

    核心函数
    offer

    public boolean offer(E e) {
            //获取可重入锁
            final ReentrantLock lock = this.lock;
            //加锁
            lock.lock();
            try {
                //将元素加入优先级队列中
                q.offer(e);
               //如果当前元素为队首,将leader=null,唤起其他线程
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
               //释放锁
                lock.unlock();
            }
        }
    

    take

    public E take() throws InterruptedException {
           //获取可重入锁
            final ReentrantLock lock = this.lock;
            //判断当前线程是否中断,没有中断就将当前线程锁定
            lock.lockInterruptibly();
            try {
                //循环执行
                for (;;) {
                    E first = q.peek();
                    //如果队首为空,阻塞当前线程
                    if (first == null)
                        available.await();
                    else {
                        //获取当前元素过期时间
                        long delay = first.getDelay(NANOSECONDS);
                        //小于等于0 直接弹出
                        if (delay <= 0)
                            return q.poll();
                         //将first 只为null,避免内存泄漏
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            //阻塞当前线程
                            available.await();
                        else {
                            //将当前线程赋值给leader,然后阻塞delay时间,等待队首元素达到可出队时间
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                 //释放leader引用
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //如果leader元素为空,优先级队列不为空唤起其他线程
                if (leader == null && q.peek() != null)
                    available.signal();
                //释放锁
                lock.unlock();
            }
        }
    
    4. PriorityBlockingQueue

    无界优先队列
    构造函数

     public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    public PriorityBlockingQueue(Collection<? extends E> c) {
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            boolean heapify = true; // true if not known to be in heap order
            boolean screen = true;  // true if must screen for nulls
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                this.comparator = (Comparator<? super E>) ss.comparator();
                heapify = false;
            }
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                this.comparator = (Comparator<? super E>) pq.comparator();
                screen = false;
                if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                    heapify = false;
            }
            Object[] a = c.toArray();
            int n = a.length;
            // If c.toArray incorrectly doesn't return Object[], copy it.
            if (a.getClass() != Object[].class)
                a = Arrays.copyOf(a, n, Object[].class);
            if (screen && (n == 1 || this.comparator != null)) {
                for (int i = 0; i < n; ++i)
                    if (a[i] == null)
                        throw new NullPointerException();
            }
            this.queue = a;
            this.size = n;
            if (heapify)
                heapify();
        }
    

    相关属性

    private static final int DEFAULT_INITIAL_CAPACITY = 11;   //默认容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大容量
    private transient Object[] queue;   //存放元素数组
    private transient int size;  //元素个数
    private transient Comparator<? super E> comparator;  //比较器
    private final ReentrantLock lock;  //可重入锁
    private final Condition notEmpty;  //非空条件
    private transient volatile int allocationSpinLock; //扩容时,CAS更新这个值谁更新成功谁执行
    private PriorityQueue<E> q;//不阻塞的优先队列,用于序列化/反序列化
    

    核心函数
    offer

    public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
             //判断是否需要扩容
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                //根据是否有比较器选择不同方法
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                //唤醒notEmpty条件
                notEmpty.signal();
            } finally {
               //释放锁
                lock.unlock();
            }
            return true;
        }
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            // 取父节点
            int parent = (k - 1) >>> 1;
            // 父节点的元素值
            Object e = array[parent];
            // 如果key大于父节点,堆化结束
            if (key.compareTo((T) e) >= 0)
                break;
            // 否则,交换二者的位置,继续下一轮比较
            array[k] = e;
            k = parent;
        }
        // 找到了应该放的位置,放入元素
        array[k] = key;
    }
    

    take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        E result;
        try {
            // 队列没有元素,就阻塞在notEmpty条件上
            // 出队成功,就跳出这个循环
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            // 解锁
            lock.unlock();
        }
        // 返回出队的元素
        return result;
    }
    private E dequeue() {
        // 元素个数减1
        int n = size - 1;
        if (n < 0)
            // 数组元素不足,返回null
            return null;
        else {
            Object[] array = queue;
            // 弹出堆顶元素
            E result = (E) array[0];
            // 把堆尾元素拿到堆顶
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            // 并做自上而下的堆化
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            // 修改size
            size = n;
            // 返回出队的元素
            return result;
        }
    }
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            // 只需要遍历到叶子节点就够了
            while (k < half) {
                // 左子节点
                int child = (k << 1) + 1; // assume left child is least
                // 左子节点的值
                Object c = array[child];
                // 右子节点
                int right = child + 1;
                // 取左右子节点中最小的值
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                // key如果比左右子节点都小,则堆化结束
                if (key.compareTo((T) c) <= 0)
                    break;
                // 否则,交换key与左右子节点中最小的节点的位置
                array[k] = c;
                k = child;
            }
            // 找到了放元素的位置,放置元素
            array[k] = key;
        }
    }
    
    5. SynchronousQueue

    双栈双队列算法,一个写SynchronousQueue需要和一个读SynchronousQueue组队出现
    构造方法

    public SynchronousQueue() {
            this(false);
        }
    public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }
    

    相关属性

    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    static final int maxUntimedSpins = maxTimedSpins * 16;
    static final long spinForTimeoutThreshold = 1000L;
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;
    

    核心方法
    put

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
                Thread.interrupted();
                throw new InterruptedException();
            }
        }
      E transfer(E e, boolean timed, long nanos) {
                /* Basic algorithm is to loop trying to take either of
                 * two actions:
                 *
                 * 1. If queue apparently empty or holding same-mode nodes,
                 *    try to add node to queue of waiters, wait to be
                 *    fulfilled (or cancelled) and return matching item.
                 *
                 * 2. If queue apparently contains waiting items, and this
                 *    call is of complementary mode, try to fulfill by CAS'ing
                 *    item field of waiting node and dequeuing it, and then
                 *    returning matching item.
                 *
                 * In each case, along the way, check for and try to help
                 * advance head and tail on behalf of other stalled/slow
                 * threads.
                 *
                 * The loop starts off with a null check guarding against
                 * seeing uninitialized head or tail values. This never
                 * happens in current SynchronousQueue, but could if
                 * callers held non-volatile/final ref to the
                 * transferer. The check is here anyway because it places
                 * null checks at top of loop, which is usually faster
                 * than having them implicitly interspersed.
                 */
    
                QNode s = null; // constructed/reused as needed
                boolean isData = (e != null);
    
                for (;;) {
                    QNode t = tail;
                    QNode h = head;
                    if (t == null || h == null)         // saw uninitialized value
                        continue;                       // spin
    
                    if (h == t || t.isData == isData) { // empty or same-mode
                        QNode tn = t.next;
                        if (t != tail)                  // inconsistent read
                            continue;
                        if (tn != null) {               // lagging tail
                            advanceTail(t, tn);
                            continue;
                        }
                        if (timed && nanos <= 0)        // can't wait
                            return null;
                        if (s == null)
                            s = new QNode(e, isData);
                        if (!t.casNext(null, s))        // failed to link in
                            continue;
    
                        advanceTail(t, s);              // swing tail and wait
                        Object x = awaitFulfill(s, e, timed, nanos);
                        if (x == s) {                   // wait was cancelled
                            clean(t, s);
                            return null;
                        }
    
                        if (!s.isOffList()) {           // not already unlinked
                            advanceHead(t, s);          // unlink if head
                            if (x != null)              // and forget fields
                                s.item = s;
                            s.waiter = null;
                        }
                        return (x != null) ? (E)x : e;
    
                    } else {                            // complementary-mode
                        QNode m = h.next;               // node to fulfill
                        if (t != tail || m == null || h != head)
                            continue;                   // inconsistent read
    
                        Object x = m.item;
                        if (isData == (x != null) ||    // m already fulfilled
                            x == m ||                   // m cancelled
                            !m.casItem(x, e)) {         // lost CAS
                            advanceHead(h, m);          // dequeue and retry
                            continue;
                        }
    
                        advanceHead(h, m);              // successfully fulfilled
                        LockSupport.unpark(m.waiter);
                        return (x != null) ? (E)x : e;
                    }
                }
            }
    

    take

     public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    

    相关文章

      网友评论

          本文标题:多线程之阻塞队列

          本文链接:https://www.haomeiwen.com/subject/icjjyctx.html