美文网首页
StampedLock readLock() 源码解析

StampedLock readLock() 源码解析

作者: Xzzzl | 来源:发表于2019-05-09 16:52 被阅读0次

    //每次reader+1
    private static final long RUNIT = 1L;
    //倒数第八位,写锁标志位
    private static final long WBIT = 1L << LG_READERS;
    //倒数后七位,读线程数量
    private static final long RBITS = WBIT - 1L;
    //最大读线程数量
    private static final long RFULL = RBITS - 1L;
    //用它检测是否在写以及当前读线程数量
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    static final class WNode {
    volatile WNode prev; //前置节点
    volatile WNode next; //后置节点
    volatile WNode cowait; // list of linked readers
    volatile Thread thread; // 当前线程
    volatile int status; // 状态位!全靠它实现所有无锁/CAS和锁操作
    final int mode; // 读或写
    WNode(int m, WNode p) { mode = m; prev = p; }
    }

    public long readLock() {
        long s = state, next;
        //1. CLH队列空
        //2. 没有写,读线程没满
        //3. 尝试用CAS获得读资格
        //上述条件都满足时直接返回修改后的状态位(读线程+1),否则继续尝试获得读锁
        return ((whead == wtail && (s & ABITS) < RFULL &&
                U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }
    
    private long acquireRead(boolean interruptible, long deadline) {
        //当前节点,p尾指针
        WNode node = null, p;
        //SPINS时最大尝试次数
        for (int spins = -1;;) {
            //h头指针
            WNode h;
            //CLH空,不断的使用CAS去尝试
            if ((h = whead) == (p = wtail)) {
                for (long m, s, ns;;) {
                    //没有写,而且读线程未满就用CAS尝试
                    //否则尝试增加读线程溢出
                    //都成功就返回
                    if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                        //有写线程,再来一次循环
                    else if (m >= WBIT) {
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        }
                        else {
                            //尝试失败
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                //h,p指针没错,并且CLH队列不空,跳出整个自旋
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            //设置最大尝试次数,再来一次
                            spins = SPINS;
                        }
                    }
                }
            }
            // initialize queue,后面的操作都在插入CLH队列
            // new一个空WNode,让头指针指向它,以后只要头尾指针相同,则CLH队列为空
            if (p == null) {
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(RMODE, p);
            else if (h == p || p.mode != RMODE) {
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }
            //让尾节点到时通知当前节点(他在尾节点后面)
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                    node.cowait = p.cowait, node))
                node.cowait = null;
            else {
                //到这说明CLH不空而且node会被尾节点通知,即CLH队列状态就绪
                for (;;) {
                    WNode pp, c; Thread w;
                    // 唤醒CLH队列第一个节点
                    if ((h = whead) != null && (c = h.cowait) != null &&
                            U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null)
                        U.unpark(w);
                    // CLH队列只有一个节点/空/p是无意义的头节点
                    // 只要没有写线程一直自旋CAS
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                    U.compareAndSwapLong(this, STATE, s,
                                            ns = s + RUNIT) :
                                    (m < WBIT &&
                                            (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);
                    }
                    // CLH无变化,即没有冲突
                    //之前的尝试都失败,准备阻塞线程
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        //deadline=0L,本方法内也不会被改变
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                                whead == h && p.prev == pp)
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        //interruptible=false,不会改变
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }
    
        //到这说明:尝试次数耗尽并且此过程中无冲突/已将node加入空队列尾
        for (int spins = -1;;) {
            //h头指针/p尾指针
            WNode h, np, pp; int ps;
            //CLH队列空
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    //和之前相似,尝试CAS
                    if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        //自己得到读锁,就唤醒他人
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                    c, c.cowait) &&
                                    (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    else if (m >= WBIT &&
                            LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            }
            //队列不空,就从头唤醒队列中元素
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                //node成为队尾
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                //让p状态为waiting
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                    //p已经取消,从队列中删除
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    //到这说明node已经入队并且整个队列之前元素的状态已经设置完成
                    //和之前相似,准备阻塞线程
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                            (p != h || (state & ABITS) == WBIT) &&
                            whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:StampedLock readLock() 源码解析

          本文链接:https://www.haomeiwen.com/subject/piswoqtx.html