美文网首页JDK源码解析
LinkedTransferQueue源码解析

LinkedTransferQueue源码解析

作者: i砖工 | 来源:发表于2020-04-30 12:17 被阅读0次

之前写了SynchronousQueue的源码解析,其内部实现有两个数据结构:一个是栈,一个是FIFO队列,在之前的文章中主要分析了栈(非公平模式)的实现思路而没有分析队列的实现,其实是因为JDK里面本身有LinkedTransferQueue这个队列的实现,所以今天单独来说下这个队列的实现思路,主要目的方便自己以后快速的回顾。
首先我们回顾一下SynchronousQueue里面的TransferStack的实现方式:
1.栈中节点通过0,1,2来标记节点是获取数据还是生产数据,或者是正在匹配。
2.栈中的节点称为waiter(等待者),等待其它线程与其匹配。
3.匹配过程是将匹配节点入栈,并设置节点操作模式为FULFILLING|mode让后面的匹配节点和等待节点都不能入栈,即达到了匹配动作的线程安全。
LinkedTransferQueue的实现在逻辑上与TransferStack大致相同,不同点在于队列是FIFO的,所以匹配每次操作的都是从头节点开始寻找,直到找到一个未匹配的节点,而入队列的节点是从队尾进入,与队头不冲突,所以在实现上,不会需要像栈一定,将匹配中的节点压入栈。
LinkedTransferQueue的实现也没有像SynchronousQueue里面的实现那样,在transfer(xfer)方法的处理上也有一些不同。在SynchronousQueue的transfer方法上只有3个参数,方法实现是通过是否传入元素e来判断本次操作是获取还是生产,而在LinkedTransferQueue的xfer方法的实现上,还需要手动的传入一个haveData参数来表名本次的操作。(xfer是jdk1.7写的,transfer方法是1.8写的,可以看出作者后面也发现这个参数真没必要)。
先说说总体首先思路吧:
1.一次操作,先从队列头开始检查,看节点是否已经匹配,并且模式是否与本次操作相匹配,如果模式匹配,则进行配对CAS操作。
2.如果头节点已经被匹配,则循环查找下一个节点
3.如果队列为空,或者操作模式与队列中的节点的模式一样,则本次操作构造为节点进入队列成为等待者
4.等待者在队列中睡眠或者自旋,当有调用者将其唤醒后匹配成功出队列

接下来是对代码的详细说明:

private E xfer(E e, boolean putData, int how, long nanos) {
    //判断参数操作模式和e是否匹配,如果是put,则e不为空
    if (putData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race

        //代码1:从头节点开始遍历,直到队列为空
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean nodeIsPutData = p.isData; //头节点操作模式(false为消费,true为生产)
            Object item = p.item; //头节点数据
            
            //代码2
            //代码2-1:如果头节点的数据不等于自己(节点未被匹配,见awaitMatch方法代码1)
            //(item != null) == nodeIsPutData 表明节点的操作模式要与节点的数据对应,这样节点才是正常的
            if (item != p && (item != null) == nodeIsPutData) { // unmatched //如果头节点已经被匹配,则去到代码3,继续循环下一个节点
                if (nodeIsPutData == putData)   // can't match //如果头节点的模式和本次操作的模式一样,则不能匹配,所以跳出循环来到代码4(入队列)
                    break;
                
                if (p.casItem(item, e)) { // match //匹配成功
                    //n1(head: h)->n2->n3->n4->n5(p)->n6->n7(tail)  假如找到n5才匹配上,则说明n1,n2,n3,n4都已经
                    //
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        
                        //将头节点指向p节点(p为尾节点)或者指向p.next,从而p之前的那些节点出队列
                        //这里为什么不是直接指向p.next?为什么有可能指向p,p不是已经被匹配了么 ?
                        //因为如果让head指向null的话还需要cas改变tail,两个cas没办法保证原子性),否则指向q的下一个节点。这里在代码3处得到补偿
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext(); //将next指向自己,这里印证了代码3
                            break; //
                        }                 // advance and retry
                        
                        //重新判断队列里面是否至少还有2个节点,并且第二个是已经被匹配了的,这种情况,再次进入循环,尝试将已经匹配的节点出队列。
                        //if((h = head) != null && (q = h.next) != null && q.isMatched()){
                        //  continue;
                        //}else break;
                        
                        if ((h = head)   == null ||
                            (q = h.next) == null || 
                            !q.isMatched())
                            break;        // unless slack < 2
                        
                    }
                    
                    //代码2-2
                    //唤醒p节点,节点从awaitMatch方法中醒来,然后返回配对的值
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            
            //代码3,如果p!=p.next(p没有出队列),则取下一个节点继续尝试,否则从“头”开始
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        //代码4:如果入队节点模式与等待者相同,或者队列中已经没有待匹配的等待者
        //NOW代表不等待的poll操作,除此之外
        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, putData); //本次操作构造节点
            
            Node pred = tryAppend(s, putData); //尝试入队列,详情见tryAppend方法
            if (pred == null)
                continue retry;   //入队失败,从新尝试        // lost race vs opposite mode
            
            //ASYNC代表offer, put, add
            //如果入队列成功,并且是take和timed poll方法,则进入等待
            if (how != ASYNC) //SYNC / TIMED
                //让当前节点进入睡眠状态,等待配对者将其唤醒
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        
        return e; // not waiting
    }
}


/**
将本次操作封装成队列节点入队
入队列逻辑:
1.判断队列是否为空,为空则直接入
2.从队尾入手,判断队尾节点的状态是否与本次操作相同,如果不同则不能入队列
2.1.如果能入队列,则判断循环中的p节点是否还是尾节点,如果不是,则继续往下找,直到尾节点
3.如果p是尾节点,则cas将s节点入队列,如果cas失败,则p重新定位为next,又重新尝试
4.如果入队成功
**/
private Node tryAppend(Node s, boolean putData) {
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        //1.队列为空,将当前节点设置为头节点
        if (p == null && (p = head) == null) {
            if (casHead(null, s))
                return s;                 // initialize
        }
        //2.来到这里说明队列不为空,此时理论上p!=null
        //p节点未被匹配,并且模式与putData相反,则说明s节点无法接入队列
        else if (p.cannotPrecede(putData))
            return null;                  // lost race vs opposite mode
        //2.1 如果已经有新节点入队列了,或者队尾出队了(p.next == p)
        else if ((n = p.next) != null)    // not last; keep traversing
        
            // tail发生了变化,则指向新的tail(u),否则看老的tail节点是否出队列,如果未出,则让p指向p.next,如果已出队列,则说明队列空了,则让p指向null重新尝试
            //让t每次都指向最新的tail,而如果p!=t,则说明p不是尾节点, 如果t已经等于了tail,则单独设置p(如果p!=p.next,则p就指向next,否则p指向null,重新从head开始)
            //如果t不等于tail,则将t指向tail
            //这里为什么要这么复杂? 因为有代码3.可知,节点入队的保证是casNext, 如果保证线程安全的入队,不能只拿到tail就append,因为有可能老的tail已经不是队尾了,
            //所以必须一直遍历到p.next等于null才能入队列,并且如果发现p已经出队列,则将p指向null,然后代码来到1.处,从“头”开始往后遍历。
            //为什么从头开始能保证线程安全呢? 因为节点出队列是casHead来保证的。
            p = (p != t) && t != (u = tail) ? 
                        (t = u) :             
                        ((p != n) ? n : null;  )// restart if off list
                    
        //3.入队列
        else if (!p.casNext(null, s))
            p = p.next;                   // re-read on CAS failure
        //4.如果入队成功返回前节点
        else {
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) && 
                       (t = tail)   != null && 
                       (s = t.next) != null &&
                       (s = s.next) != null && 
                       s != t);                    
                // advance and retry       
            }
            return p;
        }
    }// for over
}

//返回true,说明调用者节点不能作为前继节点
final boolean cannotPrecede(boolean haveData) {
    boolean d = isData; //
    Object x;
    //如果调用者操作模式和本次操作模式不相同
    //并且调用者没有被匹配 x = item) != this && (x != null) == d
    return d != haveData && (x = item) != this && (x != null) == d;
}



private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; //让出CPU的次数 // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    //自旋方法:睡眠节点等待超时,被中断,或者匹配成功才会返回。
    for (;;) {
        Object item = s.item;
        //代码1:当节点数据放生变化时,说明节点已经被匹配,返回匹配值
        if (item != e) {                  // matched
            // assert item != s;
            s.forgetContents();           // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        //如果当前线程被中断,或者线程等待超时(timed && nanos <= 0)并且将自己的item指向自己(在代码2-1处可以看到,判断节点是否被匹配,是通过判断item是否等于自己)
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            unsplice(pred, s);
            return e;
        }

        //初始化spins==-1,当其小于0时,
        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        //自旋次数大于0,继续
        else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();   //随机让出CPU时间       // occasionally yield
        }
        //自旋完成,准备进入睡眠,先设置等待线程
        else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        }
        //如果有时间等待限制,判断是否超过等待时间,如果没有超过等待时间,则让节点睡眠(一定时间,节点超时或者被其它节点唤醒)
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else { //如果没有时间限制,则直接将线程睡眠
            LockSupport.park(this);
        }
    }
}

/**
通过前节点和当前节点的操作模式来判断自旋次数
**/
private static int spinsFor(Node pred, boolean haveData) {
    //如果是多核心CPU,并且前继节点存在,则根据前继节点的情况设置自旋次数。
    if (MP && pred != null) {
        if (pred.isData != haveData)      // phase change //没看懂为什么前节点的模式和本次操作模式会不相等(因为节点都已入队列,说明前节点和本节点类型应该一定相等才对)
            return FRONT_SPINS + CHAINED_SPINS;
        if (pred.isMatched())             // probably at front  //如果前节点以及被匹配,那么自己就是最前面的节点了,那就使用前节前的自旋次数定义
            return FRONT_SPINS;
        if (pred.waiter == null)          // pred apparently spinning //如果前节点的waiter等于空,说明前节点还在自旋,则使用较小的CHAINED_SPINS作为自旋次数
            return CHAINED_SPINS;
    }
    
    return 0;
}

最后再总结一下:
1.队列中的节点称为等待者,等待者首先会自旋一会儿来判断自己是否被匹配上,自旋完了还没匹配上就线程睡眠。
2.判断一个节点是否被匹配,直接判断item跟isData模式上是否能匹配,但是代码里面多加了一个item是否指向自己的判断(我觉得没必要),因为匹配成功的条件就是p.casItem(item, e),见代码2-1
3.判断节点是否出队列,需要判断节点以及节点的next是否指向自己。
4.等待着发现自己被匹配上了,会将自己节点的内容清空:即通过forgetContents方法将item指向自己,将线程指向null.

由于个人能力有限,代码的理解上可能会存在一定的错误,还请各位指正,谢谢。

相关文章

网友评论

    本文标题:LinkedTransferQueue源码解析

    本文链接:https://www.haomeiwen.com/subject/jflzwhtx.html