美文网首页
juc6-SynchronousQueue与LinkedTran

juc6-SynchronousQueue与LinkedTran

作者: modou1618 | 来源:发表于2019-02-04 11:33 被阅读0次

一 SynchronousQueue

  • 不存储消息,线程生产消息后休眠等待其他线程消费,被消费后生产者线程才继续往后处理。

1.1 实例化

  • 实例化,公平则是按先进先出排序消费,非公平则不确定顺序消费
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

1.2 入队

  • 入队函数,实际都是调用transferer.transfer(e, true, unit.toNanos(timeout))实现
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

1.3 出队

  • 出队函数,实际都是调用transferer.transfer(null, true, unit.toNanos(timeout))实现,返回出队消息
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return transferer.transfer(null, true, 0);
}

1.4 TransferQueue

  • 公平锁,按请求先后顺序进行处理

1.4.1 队列存储节点结构

  • 单链表
static final class QNode {
    volatile QNode next;          // next node in queue
// 数据存储,null表示需要数据,非null表示传输数据。
// null->非null表示获取到数据。
// 非null->null表示数据传输完成。
// item=QNode,表示节点被cancel
    volatile Object item;         
    volatile Thread waiter;       // 休眠等待的线程
    final boolean isData;//true 生产者,false 消费者

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

//cas修改链表节点下一节点引用
    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
                UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
//cas修改节点数据
    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
   //数据引用cas修改为节点引用,即节点取消处理
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    boolean isCancelled() {
        return item == this;
    }
    //是否从链表中退出
    boolean isOffList() {
        return next == this;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
//cas修改时使用
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = QNode.class;
            itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

1.4.2 实例化

  • 初始化链表头尾节点
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

1.4.3 基本操作

  • 修改head节点
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}
  • 修改尾节点
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
  • 修改cleanMe节点
boolean casCleanMe(QNode cmp, QNode val) {
    return cleanMe == cmp &&
            UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

1.4.4 入队/出队函数transfer()

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);//e为null出队,e不为null入队

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 还未实例化
            continue;                       // 自旋等待
        if (h == t || t.isData == isData) { //队列为空,或队列节点模式相同
            QNode tn = t.next;
            if (t != tail)                  // 尾节点有并发修改,则重新获取
                continue;
            if (tn != null) {  //tail不是真正的尾节点,则先更新tail节点再重新获取
                advanceTail(t, tn);
                continue;//不断循环直到tail为真正的尾节点
            }
            if (timed && nanos <= 0) // can't wait
                return null;
            if (s == null)//若并发导致插入失败,则不为null
                s = new QNode(e, isData);//首次执行则初始化节点,
            if (!t.casNext(null, s))    //插入队尾,若有并发修改,则重头开始重新处理
                continue;

            advanceTail(t, s);  // 更新tail,不关心结果,入队之前会保证tail更新成真正的尾节点
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) { // 节点被cancel
                clean(t, s);
                return null;
            }
            //节点s被消费,
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);   // 释放原head节点,更新s为head
                if (x != null)              // and forget fields
                    s.item = s;//cancel
                s.waiter = null;//清空等待线程
            }
            return (x != null) ? (E)x : e;//返回传递的数据

        } else { //模式不同,则节点消费
            QNode m = h.next;  // 从头节点开始
            if (t != tail || m == null || h != head)
                continue; // 有并发处理,则重头开始重新处理

            Object x = m.item;
            if (isData == (x != null) ||    // m已经被消费
                    x == m ||                   // 节点被cancel
                    !m.casItem(x, e)) {         // 有并发,消费失败
                advanceHead(h, m);          // m出队,处理下一节点
                continue;
            }
//m节点被消费,m出队,
            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);//唤醒m节点的线程,进行后续处理
            return (x != null) ? (E)x : e;//返回传输的数据
        }
    }
}

1.4.5 awaitFulfill()

//自旋次数,NCPUS是cpu核数
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
//1ms,最短休眠时间
static final long spinForTimeoutThreshold = 1000L;

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    //计算限时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
//计算自旋次数,
// 非头节点不自旋。
// 头节点则自旋等待对应的生产者或消费者
// 配置超时时间的,maxTimedSpins = cpu核数小于2则不自旋,否则自旋32次
// 未配置超时时间的,自旋 maxTimedSpins * 16
    int spins = ((head.next == s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);//当前生产者或消费者被中断,则cancel节点
        Object x = s.item;//获取节点数据
        if (x != e)
//生产者节点则item被修改为null
//消费者节点则item被修改为生产数据
//cancel则item被修改为节点引用地址
            return x;
        if (timed) {//指定超时时间
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {//超时到达,cancel节点
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;//自旋
        else if (s.waiter == null)
            s.waiter = w;//配置等待线程
        else if (!timed)//无超时时间,则一直休眠等待
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);//限时休眠,等待唤醒
    }
}

1.4.6 clean()

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        if (hn != null && hn.isCancelled()) {
//从头节点开始检查cancel节点,若cancel则释放节点
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)//队列已空,则返回
            return;
        QNode tn = t.next;
        if (t != tail)
            continue;
        if (tn != null) {//更新tail为真实的尾节点
            advanceTail(t, tn);
            continue;
        }
        if (s != t) {        // 非尾节点,断开链表连接
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
      //待删除节点是尾节点,cleanMe有待删除节点则处理删除,
      // 否则放入cleanMe延迟处理删除
      //cleanMe表示清理我的next节点
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;//待清理节点
            QNode dn;
            if (d == null ||               // d is gone or
                    d == dp ||                 // d is off list or
                    !d.isCancelled() ||        // d not cancelled or
                    (d != t &&                 // d not tail and
                            (dn = d.next) != null &&  //   has successor
                            dn != d &&                //   that is on list
                            dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);//已清理完成
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))
            return;          // Postpone cleaning s
    }
}

1.5 TransferStack

  • 非公平消费,新请求不入队,直接先消费模式不一样的head节点。

1.5.1 transfer

E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // 链表为空,或和head模式相同,则入队
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled()) //清理cancelled节点
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;//直接返回null
            } else if (casHead(h, s = snode(s, e, h, mode))) {//插入头节点
                SNode m = awaitFulfill(s, timed, nanos);//休眠等待目标节点。request等待data,data等待request类型
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // 修改next节点为头节点
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { //模式不同,且head节点未被消费,则待插入节点尝试消费head
            if (h.isCancelled())            //cancelled节点则清理
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//插入节点,尝试消费head
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // 链表为空,清理i
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {//尝试消费,消费成功,删除两个节点
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else  // 并发其他已消费,则只删除被消费的节点
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {  // head已被消费,则删除head和消费head的next节点
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // 删除消费head的节点和head
                else                        // lost match
                    h.casNext(m, mn);       // 下一节点不是消费head的,则删除
            }
        }
    }
}

二 LinkedTransferQueue

  • 数据处理private E xfer(E e, boolean haveData, int how, long nanos),所有入队,出队都调用这个函数
  • e有值生产数据,haveData=true。e为null表示消费数据,haveData=false。
  • how表示xfter的模式,有四种。
// for untimed poll, tryTransfer,节点成功消费或消费失败
private static final int NOW   = 0; 
// for offer, put, add。节点存储到队列中,可以后续处理
private static final int ASYNC = 1; 
 // for transfer, take。无不同模式则休眠或自旋等待。
private static final int SYNC  = 2;
 // for timed poll, tryTransfer。指定限时时间休眠等待,超时在处理失败。
private static final int TIMED = 3;
  • xfer函数实现
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))//参数冲突错误
        throw new NullPointerException();
    Node s = null;  // the node to append, if needed

    retry:
    for (;;) {  // restart on append race
        //遍历链表,进行处理
        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;
            Object item = p.item;
           //(item != null) == isData 数据节点未消费
           //item != p 数据节点未释放
            if (item != p && (item != null) == isData) { // unmatched
                if (isData == haveData)   
                    break;// 和新节点模式一样,都是生产或消费数据
                if (p.casItem(item, e)) { // 修改item值,消费节点,
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                        //修改头节点,释放消费q.isMatched()的原头节点
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                         // 节点数<=1,或q未被消费则不再继续处理
                            break;        
                    }
                    LockSupport.unpark(p.waiter);//唤醒线程
                    return LinkedTransferQueue.<E>cast(item);//返回数据
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        if (how != NOW) { // No matches available
            if (s == null)
                s = new Node(e, haveData);
            Node pred = tryAppend(s, haveData);//节点入队列尾
            if (pred == null)//有并发竞争失败,或有相反模式节点可消费
                continue retry;           // lost race vs opposite mode
            if (how != ASYNC)//同步或限时则线程休眠等待
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // NOW或异步类型立即返回
}
  • tryAppend
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        if (p == null && (p = head) == null) {
            if (casHead(null, s))//空链表
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            return null; // 模式相反且未被消费,返回null重新处理
        else if ((n = p.next) != null)    // not last; keep traversing
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))//遍历直到尾节点,插入尾节点
            p = p.next;   // 插入失败则重新获取nexit节点,准备插入
        else {//插入成功
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&//更新tail节点
                        (t = tail)   != null &&
                        (s = t.next) != null && // advance and retry
                        (s = s.next) != null && s != t);
            }
            return p;
        }
    }
}

final boolean cannotPrecede(boolean haveData) {
    boolean d = isData;
    Object x;
    return d != haveData && (x = item) != this && (x != null) == d;
}
  • awaitMatch
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = -1; // initialized after first item and cancel checks
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        if (item != e) {  // 节点已被消费
            // assert item != s;
            s.forgetContents();  // 释放数据引用
            return LinkedTransferQueue.<E>cast(item);//返回数据
        }
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // 中断或超时则节点cancel
            unsplice(pred, s);
            return e;
        }
     //自旋等待或休眠等待
        if (spins < 0) {                  // establish spins at/near front
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            LockSupport.park(this);
        }
    }
}

相关文章

  • juc6-SynchronousQueue与LinkedTran

    一 SynchronousQueue 不存储消息,线程生产消息后休眠等待其他线程消费,被消费后生产者线程才继续往后...

  • && 与& ,||与|

    回忆知识点i++,,++i变量在前 先用变量符号(☞++/--)在前 先计算

  • 认真与身板

    认真与身板 认真与态度 认真与自信 认真与信心 认真与诚心 认真与正心 认真与正念 认真与正面 认真与精诚 认真与...

  • 与荒野,与你,与自己

    周末了,想跟大家分享一首诗 《阿莱夫》 诗作者:赖尔逊 阿莱夫在草原上盖了一栋房子, 犹如大海上的灯塔。 但你无法...

  • 与雪与丘与故土

  • 与海与浪与念

    木君 下午,在一段段风雨的催促下来到了绥中。天是被蒙起来的,太阳早已不知躲到哪里去了。微弱的日光和着轻柔的海风洒在...

  • 晚风与柳 孤独与狗 桥与落叶 马与白隙 云与苍天 梭与星月 天与地 生与死 树与来路 花与过往 我与你 爱与恨 夜色与酒

  • 海街日记

    和解。与他人和解、与家人和解、与自己和解;与得到和解、与失去和解;与过去和解、与现在、未来和解;与现实和解、与虚幻...

  • 生怕忘了的题目

    少与不少 多与不多 苦与不苦 乐与不乐 对与不对 错与不错 离与不离 合与不合 唱与不唱 说与不说

  • 2017-04-11

    体验入:真诚.与专业。幽默与风趣。赞美与了解。认可与相信。沟通与关注。关心与引领。快乐与持续。简单与重复。 ...

网友评论

      本文标题:juc6-SynchronousQueue与LinkedTran

      本文链接:https://www.haomeiwen.com/subject/ubvysqtx.html