美文网首页
java并发编程(6):BlockingQueue相关源码详解

java并发编程(6):BlockingQueue相关源码详解

作者: 桥头放牛娃 | 来源:发表于2019-11-02 07:23 被阅读0次

    BlockingQueue的类继承结构如下,其主要实现类有:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue。

    BlockingQueue的类继承结.png

    1、ArrayBlockingQueue

    ArrayBlockingQueue是由数组组成的有界阻塞队列。队列中元素按照FIFO的方式进行排序。默认情况下线程以非公平的方式获取队列中的数据。

    1.1、主要属性

    //队列元素的容器,以数组作为队列的实现
    final Object[] items;
    
    //下一个获取(take、poll、peek、remove)元素数据的数组索引
    int takeIndex;
    
    //下一个存储(put、offer、add)元素的数组索引
    int putIndex;
    
    //队列中元素的个数
    int count;
    
    //锁
    final ReentrantLock lock;
    
    //队列不为空的信号量,即当插入数据到队列的时候,
    //会通过notEmpty信号量通知其他等待获取队列数据的线程
    private final Condition notEmpty;
    
    //队列不满的信号量,即当线程从队列中取出数据后,
    //会通过notFull信号量通知其他线程可以继续向队列中存储元素
    private final Condition notFull;
    

    1.2、构造函数

    //初始化队列的容量
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    //初始队列的容量,及是否为公平锁
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    //初始化队列容量,是否为公平锁、及初始队列中的元素
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
    
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                //将初始化的元素放入队列中
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
    

    1.3、入队操作

    //offer:非阻塞入队操作
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //队列中元素的数量已经达到队列容量的上线,则直接返回false,否则将元素enqueue()
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    //put:阻塞入队操作
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //若队列空间达到容量上线,则调用notFull.await()等待其他线程取走只是一个队列中的数据
            while (count == items.length)
                notFull.await();
            //当队列不满的时候,直接将数据入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    //offer:超时时间内入队数据
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //若队列已满
            while (count == items.length) {
                //无需等待超时,则直接返回false
                if (nanos <= 0)
                    return false;
                //等待超时时间nanos后再检查队列是否还是满的    
                nanos = notFull.awaitNanos(nanos);
            }
            //若队列不满则做入队操作
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    //入队操作
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //将数据插入队尾
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //通知其他等待获取数据的线程
        notEmpty.signal();
    }
    

    1.4、出队操作

    //poll:非阻塞获取队列中的数据
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //队列为空,则返回null;否则返回队头数据
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    //take:阻塞获取队列中数据
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //队列为空,则在notEmpty信号量上等待其他线程将数据放入队列
            while (count == 0)
                notEmpty.await();
            //获取队列中数据    
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    //poll:在超时时间内获取数据
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //队列为空,如无需等待,则直接返回null;否则等待nanos超时时间
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    //直接获取队头的数据,但不改变takeIndex值,即可重复peek()获取队头的数据
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    //获取队头数据
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //通知其他在notFull上等待的线程    
        notFull.signal();
        return x;
    }
    
    

    1.5、移除数据

    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        
        //移除的是队头的数据?则直接移除队头数据,并修改takeIndex值
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove
    
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                //将队列中的数据前移    
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    //队尾数据
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }
    

    2、LinkedBlockingQueue

    LinkedBlockingQueue是基于单向链表的阻塞队列。

    2.1、基本属性

    //链表中的节点
    static class Node<E> {
        E item;
    
        //后继节点
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    
    //队列的容量
    private final int capacity;
    
    //当前队列中元素的个数
    private final AtomicInteger count = new AtomicInteger();
    
    //队列的头结点
    transient Node<E> head;
    
    //队列的尾节点
    private transient Node<E> last;
    
    //出队(take,poll等)的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    //出队等待的信号量
    private final Condition notEmpty = takeLock.newCondition();
    
    //入队锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    //入队等待的信号量
    private final Condition notFull = putLock.newCondition();
    

    2.2、入队方法

    //阻塞入队数据
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //当前队列已满?则在notFull信号量上等待,直到有其他线程从队列中获取元素
            while (count.get() == capacity) {
                notFull.await();
            }
            //当队列未满时,将元素入队,并增加队列元素计数count
            enqueue(node);
            c = count.getAndIncrement();
            //若此时队列还未满,则在notFull中发信号通知下个在notFull等待的线程进行数据的入队操作
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //当队列已满时,在notEmpty发信号,通知其他等待此信号的线程
        if (c == 0)
            signalNotEmpty();
    }
    
    //超时时间内入队
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //队列已满则且无需等待,则直接返回false,否则在notFull上等待超时
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            //数据入队及元素计数
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    //非阻塞入队操作,入队成功返回true,否则为false,不进行阻塞操作
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    

    2.3、出队方法

    //阻塞等待出队
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //超时等待出队
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //非阻塞等待出队
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    3、PriorityBlockingQueue

    PriorityBlockingQueue为有优先级的阻塞队列实现。

    3.1、基本属性

    //默认初始化的队列数组容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    //数组最大容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
    //队列数组,实际存储的是树型的数据,不过用数组来存储树
    //实际结构为二叉树,而树的父节点一定比子节点大,但左右子节点没有大小的要求
    private transient Object[] queue;
    
    //队列中元素的个数
    private transient int size;
    
    //元素优先级比较器
    private transient Comparator<? super E> comparator;
    
    //对外操作的锁
    private final ReentrantLock lock;
    
    //notEmpty等待队列
    private final Condition notEmpty;
    
    //用户空间分配的自旋锁
    private transient volatile int allocationSpinLock;
    
    //用于序列化和反序列化的队列
    private PriorityQueue<E> q;
    

    3.2、入队操作

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        //容量已满,需要扩容?
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            //将数据插入树中
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    
    //将元素向数的上层移动
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            //获取父节点下标
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            //若当前数据比父节点大,则无需上移,否则将当前节点上移
            if (key.compareTo((T) e) >= 0)
                break;
            //将父节点下移    
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    
    

    3.3、出队操作

    //阻塞出队操作,在此只分析take,其他如poll,peek类似
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
    
    //取出独立头部位置,即去除树中的根节点数据
    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
    
    //调整树结构
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
    

    4、SynchronousQueue

    SynchronousQueue为无缓冲的阻塞队列,其主要用来在两个线程之间交换数据。因其数据时无缓冲的。即当生产者生产数据的速度大于消费者消费数据的速度,会导致生产者线程长时间阻塞,导致服务不可用。

    4.1、Transferer接口

    SynchronousQueue中通过Transferer对数据进行交换。SynchronousQueue的出队及入队方法基本都是调用Transferer#transfer()方法进行处理的。而Transferer实现有两种,一个是公平交换的,一个是非公平交换的。

    abstract static class Transferer<E> {
        //e:交换的数据,当为null时,表示要获取数据;当不为null时,表示插入数据
        //timed:true:运行超时;false:不允许超时
        //nanos:超时时间
        abstract E transfer(E e, boolean timed, long nanos);
    }
    

    4.2、TransferStack交换堆

    TransferStack是非公平的交换堆,其不是某个消费者或生产者等待的时间越久就越先获取交换权。

    TransferStack主要实现源码:

    static final class TransferStack<E> extends Transferer<E> {
        //模式,表示当前操作是消费数据还是生产数据
        //REQUEST:需要消费数据
        //DATA:需要生产数据
        //FULFILLING:表示一对生产者和消费者已经匹配上了
        static final int REQUEST    = 0;
        static final int DATA       = 1;
        static final int FULFILLING = 2;
    
        //判断节点状态是否为:FULFILLING
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    
        //TransferStacks的节点数据结构
        static final class SNode {
            //堆中下个节点
            volatile SNode next;        // next node in stack
            //和当前节点匹配的节点
            volatile SNode match;       // the node matched to this
            //当前在节点等待的线程
            volatile Thread waiter;     // to control park/unpark
            //节点数据:null表示为消费者节点;否则为生产者节点
            Object item;                // data; or null for REQUESTs
            //当前节点模式:REQUEST或DATA或FULFILLING
            int mode;
    
            SNode(Object item) {
                this.item = item;
            }
                
            //cas设置打当前节点的next值   
            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
    
            //尝试将当前节点与s节点配对
            boolean tryMatch(SNode s) {
                //cas设置当前节点的math节点
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    //唤醒与当前节点配对的math节点线程
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }
        }
    
        //交换堆的头结点
        volatile SNode head;
    
        //cas设置头结点
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }
    
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            
            //数据交换处理,循环以下三个处理,直到数据交换成功或超时
            //1、若头结点为空或头结点的模式与当前节点一样,即都是生产者或消费者,
            //则直接将当前节点插入堆的头结点,并等待其他和当前节点能匹配的节点,
            //匹配上就返回匹配的节点,若节点取消则返回null
            
            //2、若头结点的模式与当前节点模式互相补足,即能配对,即一个生产者一个消费者,
            //则向堆中头部插入当前节点,并设置当前节点状态为fulfilling,设置匹配节点的math节点,
            //然后同时弹出当前节点与其math的节点,还会唤醒与其math的节点。
            
            //3、若堆的头结点已经与其他节点配对,
            //则进行配对处理并将配对的两个节点弹出
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;
    
            for (;;) {
                SNode h = head;
                //头结点为空或头节点与当前节点模式一致
                if (h == null || h.mode == mode) {  // empty or same-mode
                    //若不需要超时,则直接将当前节点放入堆的头节点,或直接返回null
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    //需要超时,则将节点插入头部并等待超时        
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        //匹配成功,将两个节点弹出
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                 //头结点未匹配上?   
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    //头结点已取消?则更新头节点为下个节点
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    //更新头结点为当前节点,并设置当前节点模式为已配对模式
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            //节点匹配
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
    
        //在超时时间内等待匹配上节点
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            //等待的截止时间点
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            //自旋次数
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                //自旋一定次数后线程阻塞自己等待配对线程唤醒
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
    

    4.3、TransferQueue交换堆

    TransferQueue是公平的交换堆,其等待越久的会先对其进行匹配操作,从而达到公平交换。

    TransferQueue主要实现源码:

    static final class TransferQueue<E> extends Transferer<E> {
        //交换器中节点数据结构
        static final class QNode {
            //下个节点
            volatile QNode next;          // next node in queue
            //节点数据
            volatile Object item;         // CAS'ed to or from null
            //在节点等待的线程
            volatile Thread waiter;       // to control park/unpark
            //当前节点是否为数据节点
            final boolean isData;
        }
    
        //头节点
        transient volatile QNode head;
        //尾节点
        transient volatile QNode tail;
        //已经取消的节点引用,可能这个节点还未从队列中移除
        transient volatile QNode cleanMe;
    
       //cas将nh设置为新的头节点
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; // forget old next
        }
    
        //cas将nt设置为新的尾节点
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }
    
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            
            //数据交换,处理流程同TransferStack大同小异
            //1、
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);
    
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
    
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
    
                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }
    
                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;
    
                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read
    
                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }
    
                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }
    
        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }
    }
    

    5、LinkedTransferQueue

    LinkedTransferQueue是基于单链表实现的阻塞队列,节点数据入队时可以有几种模式,如AYNC、SYNC、TIMED、NOW等。不同模式应用于不同的场景。LinkedTransferQueue相较于LinkedBlockingQueue,可以直接将数据交给等待的节点,而无需入队,性能更高;LinkedTransferQueue相较于SynchronousQueue则多了存储元素的缓冲队列,可实现非阻塞的数据入队操作,避免了SynchronousQueue因消费者慢于生产者而导致的生产者阻塞问题。

    5.1、主要属性

    //cpu数是否大于1
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;
    
    //在阻塞等待之前自旋的次数
    private static final int FRONT_SPINS   = 1 << 7;
    
    //节点被其他节点自旋操作的次数
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
    
    //sweepVotes的阀值
    static final int SWEEP_THRESHOLD = 32;
    
    //头结点
    transient volatile Node head;
    
    //尾节点
    private transient volatile Node tail;
    
    //累计到一定次数再清除无效node
    private transient volatile int sweepVotes;
    
    //节点状态
    //实时模式,立即进行数据交换
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    //异步模式,若数据交换不成功,则将数据入队
    private static final int ASYNC = 1; // for offer, put, add
    //同步模式,会一直等到交换成功
    private static final int SYNC  = 2; // for transfer, take
    //超时模式,等待一定的超时时间后才失败
    private static final int TIMED = 3; // for timed poll, tryTransfer
    
    
    //等待的节点数据结构
    static final class Node {
        //是否为数据节点
        final boolean isData;   // false if this is a request node
        //节点数据
        volatile Object item;   // initially non-null if isData; CASed to match
        //下个等待的节点
        volatile Node next;
        //在当前节点等待的线程
        volatile Thread waiter; // null until waiting
    
        //cas设置下个节点
        final boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
    
        //cas设置节点数据
        final boolean casItem(Object cmp, Object val) {
            // assert cmp == null || cmp.getClass() != Node.class;
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
    

    5.2、数据交换操作

    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed
    
        retry:
        for (;;) {                            // restart on append race
            //
            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                //p节点未被匹配到?
                if (item != p && (item != null) == isData) { // unmatched
                    //节点模式不一样,无法匹配,直接跳出循环
                    if (isData == haveData)   // can't match
                        break;
                    //模式不一样,则cas尝试匹配    
                    if (p.casItem(item, e)) { // match
                        //匹配成功,则cas将下个节点从队列中移除
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            //无下个节点了?
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        //唤醒配对的节点
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }
            //模式为非立即匹配?
            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                //尝试入队    
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                //同步操作?则等待匹配    
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
    
    //将节点入队,放入队尾
    private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {        // move p to last node and append
            Node n, u;                        // temps for reads of next & tail
            //队列为空,直接入队
            if (p == null && (p = head) == null) {
                if (casHead(null, s))
                    return s;                 // initialize
            }
            //当前节点与队尾几点模式不同,无法入队?
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            //非队尾?    
            else if ((n = p.next) != null)    // not last; keep traversing
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
            //cas插入队尾        
            else if (!p.casNext(null, s))
                p = p.next;                   // re-read on CAS failure
            else {
                //将数据添加到队尾
                if (p != t) {                 // update if slack now >= 2
                    while ((tail != t || !casTail(t, s)) &&
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);
                }
                return p;
            }
        }
    }
    
    //等待数据节点匹配,直到超时
    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed
    
        for (;;) {
            Object item = s.item;
            // 如果s元素的值不等于e,说明它被匹配到了
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.<E>cast(item);
            }
            // 如果当前线程中断了,或者有超时的到期了
            // 就更新s的元素值指向s本身
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel
                unsplice(pred, s);
                return e;
            }
            // 如果自旋次数小于0,就计算自旋次数
            if (spins < 0) {                  // establish spins at/near front
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            //自旋处理
            else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            //需要超时等待,则阻塞当前线程等待
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            }
            else {
                LockSupport.park(this);
            }
        }
    }
    

    5.3、入队及出队操作

    入队出队(put、offer、add,take、poll等)操作,基本都是调用xfer进行数据交换操作,只是不同操作模式不同。

    public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }
    
    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    
    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    
    
    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    
    
    public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }
    
    public E poll() {
        return xfer(null, false, NOW, 0);
    }
    
    
    

    相关文章

      网友评论

          本文标题:java并发编程(6):BlockingQueue相关源码详解

          本文链接:https://www.haomeiwen.com/subject/cvaavctx.html