美文网首页多线程
ReentrantLock和AbstractQueuedSync

ReentrantLock和AbstractQueuedSync

作者: virtual灬zzZ | 来源:发表于2022-01-14 17:00 被阅读0次

ReentrantLock:

主要就是提供锁lock,tryLcok这些,lock的话还是调用内部Sync抽象静态类,它的子继承类(公平同步器和非公平同步器,非公平的效率会高点)的lock方法,它的构造函数可供选择是采用公平还是非公平,而tryLock只能是非公平。

public void lock() {
        sync.lock();
    }
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

AbstractQueuedSynchronizer(AQS)

ReentrantLock内部含有Sync的抽象静态类,就是继承自AQS,abstract static class Sync extends AbstractQueuedSynchronizer,而Sync又分别有两个继承类,FairSyncNonfairSync,其中Sync有个抽象方法 abstract void lock();,所以这两个继承类都必须要重写该方法,这是为了ReentrantLock在调用lock方法的时候,选择对应的Sync的lock方法。

它们都有tryAcquire方法,重写aqs的,aqs默认只是个protected方法,这是尝试获取锁的方法,因为策略的不同,它们的实现也是不同的,它们都在aqs的acquire()方法下进行调用,而它们lock方法的时候就调用acquire()方法了

还有aqs有head、tail、state这三个变量,head和tail都是Node类型的,记录是双向队列的状态的,state主要是记录锁的状态。


AQS的公共acquire方法:
public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt(); 
}

Node属性及其状态

volatile int waitStatus //节点状态
volatile Node prev //当前节点/线程的前驱节点
volatile Node next; //当前节点/线程的后继节点
volatile Thread thread;//加入同步队列的线程引用
Node nextWaiter;//等待队列中的下一个节点, exclusive是null

节点的状态有以下这些:
int CANCELLED = 1//节点从同步队列中取消
int SIGNAL = -1//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节
点的线程能够运行;
int CONDITION = -2//当前节点进入等待队列中
int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
int INITIAL = 0;//初始状态

模型如下:


流程如下:


非公平同步器NonfairSync的lock

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
·······································
AQS的公共acquire方法:
    public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt(); 
    }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
·········································
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可见一进来就用cas来抢占,抢不到就进行acquire,acquire中还是调用了tryAcquire,非公平就是nonfairTryAcquire,tryAcquire还是进行抢锁,因为锁的获取释放是动态的,所以这里应该也是得尝试获取锁,因为这个锁同时也是可重入的,因为会有累加,同时释放如果大于1次计数,也是累减的。

如果既然抢不到,那就尝试入队列吧。acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

   private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                tail为空,新建一个空Node来作为head,之后自旋参数Node拼接它后面,维护好双向链表,这步骤很重要
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

先看addWaiter,参数是Node,先说明其他AQS中的queue,就是双向队列,它由Node组成,每个Node有prev和next,形成双向,重要是有个Thread参数记录线程,模式就是排他式,tryAcquire就是排他式的,tryAcquireShared才是共享式,这里不讨论共享式。

由代码可以知道,一进来检查下这个队列尾巴tail存不存在,存在就是cas进行接入tail,接入tail失败,enq会继续尝试,可以看成是一个兜底,成功了就不执行 enq了,而enq里面就是一个死循环,一直设置接入tail成功为止。其实enq可以说是来初始化双向队列和兜底的。还有头部head,没有的话还是cas,这里必须注意,它是个new Node(),没有任何属性的,即它不属于哪一个线程,就是等于搞一个空Node作为head,设置好head之后,那么tail也是它,之后自旋,tail不为null了,参数Node(即当前线程)接入tail即可,参数Node的prev是空Node,next就是null,aqs的head是空Node,tail就是它,等于就是初始化双向队列,cas抢不过或者后续的Node按照正常逻辑继续接入tail,同时各个Node的prev和next都会设置好,其实细心看可以得知enq和addWaiter方法的接入尾部逻辑其实是一样的,在addWaiter中多一步,只为加快效率设置好Node的信息,然后快速返回Node而已

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //乖乖去排队
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

返回得到设置好的Node,回归acquireQueued方法,看看我这个Node是不是第一个排队的,即第二个节点,注意不是第一个节点,第一个节点永远是属性thread等于null的空Node

这里其实不矛盾,日常生活中,队伍中第一个那肯定在处理业务的,第二个才算是排队。

如果我是第一个排队的,我自己也要试一下能不能拿锁成功,说了锁是动态的,拿到锁的那个线程在执行代码,我这个线程也一直在跑,那我肯定得再试一次,如果还是拿不到锁,说明拿锁的大哥还没执行完,那我只能往下执行去入队了, 如果尝试获取锁成功了,就setHead()脱离prev和next,我这个node的thread属性也null了,证明出队成功,我就变成了那个headNode了,所以才说第一个节点永远是属性thread等于null的空Node,持有锁的永远不参与排队。

队列当中的head永远是持有锁的那个node(除了第一次会虚拟一个,其他时候都是持有锁的那个线程锁封装的node),这可能也是开始初始化时弄这么一个冗余节点的原因吧。

出队成功模型:


接着聊 拿不到锁,就进队列:
① 我这个node是第一个排队,但拿不到锁,进队列。
② node不是第一个排队,前头有人在排呢,这就乖乖往后去排队吧。

执行shouldParkAfterFailedAcquire,pred.waitStatus,上面的代码逻辑顺下来这个waitStatus是没有设置的(Node的初始化),所以是0,直接去到compareAndSetWaitStatus(pred, ws, Node.SIGNAL);即可。

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

/**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

可见见到,进去了,把当前Node的上一个Node的waitStatus置为-1,-1等于是阻塞休眠状态,但他们开始都是0,这个操作居然是当前Node来操控上一个Node的,为什么不自己弄自己成为-1呢,非要下一个Node来设置自己???这里先卖一个关子。

回归正题,把前面的Node弄成-1了,我自己还是0呢,有无发现把前面的Node置为-1,其实是至少需要两次进这个方法shouldParkAfterFailedAcquire()的,(大于两次无非就是waitStatus的cas失败呗),第一次前置Node肯定是0的,接着执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL); , 设置成功了,但还是返回false,回到外面的话那就执行不下去了,再一次自旋,这途径再次判断自己是不是第一个排队的,再判断自己拿不拿到锁,如果不是第一个排队那就拉倒继续排队去,如果我就是第一个排队的,又来tryAcquire尝试拿锁,如果还不行就排队去。

为什么要这样???因为拿不到锁的线程是要去park阻塞的,那操作相对比较重,尝试拿锁只是个cas,比较轻的,况且一直在说锁是运行的。

好了,把前置Node的waitStatus设置为-1了,进入parkAndCheckInterrupt,

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

把自己给park阻塞了,注意!!!!park阻塞了,执行不下去了,return Thread.interrupted(); ,这句是没有执行的!!!!

所以可得,队列中,第一个是没有park,后面的都是park了的。

好了,这里回答刚才那个问题,

当前Node来操控上一个Node的,为什么不自己弄自己成为-1呢,非要下一个Node来设置自己???

在LockSupport.park(this);后面把自己的waitStatus置为-1吧,那是执行不到的,都park还有什么执行,没用。
在前面吧,自己设置成waitStatus=-1,然后自己就被park掉了,如果park失败了呢??那岂不是说 明明自己喝醉酒却偏偏说自己没醉?所以还是得别人来确认是不是真的park成功阻塞了。

队列线程都拿到锁出完队列

这时就只剩下一个空Node了,head、tail都是它,prev和next都是null,就像初始化队列一样。

解锁

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }


看到了,releases就是递减的,如果没有减到0,就是还没释放锁,再次证明它就是可重入锁,直到state==0,要释放锁了,exclusiveOwnerThread都为null,释放看以下代码:

 if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
         //将 head 节点的 ws 改成 0,清除信号。表示,他已经释放过了。不能重复释放。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
       // 如果 next 是 null,或者 next 被取消了。就从 tail 开始向上找节点。
        if (s == null || s.waitStatus > 0) {
            s = null;
         //从尾部开始,向前寻找未被取消的节点,直到这个节点是 null,或者是 head。
         //也就是说,如果 head 的 next 是 null,那么就从尾部开始寻找,直到不是 null 为止,找到这个 head 就不管了。
         //如果是 head 的 next 不是 null,但是被取消了,那这个节点也会被略过。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
      //唤醒 head.next 这个节点。
      //通常这个节点是 head 的 next。
     //但如果 head.next 被取消了,就会从尾部开始找。
        if (s != null)
            LockSupport.unpark(s.thread);
    }

需要看head的waitStatus满足条件(h!=null && h.waitStatus!=0)才能执行unpark的,如果h==null呢,那最好了,证明没有队列,没有队列就证明没有竞争,当成是线程之间交替执行没有冲突就好。有队列才有竞争,如果存在竞争,headNode就不是null了,在unparkSuccessor里,把自己的waitStatus设置为0,不再是-1,还叫醒了NextNode。

叫醒了nextNode,nextNode重新执行了:

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //乖乖去排队
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

nextNode执行到 return Thread.interrupted();,这句无论false还是true,还是会继续自旋,再经过解锁了,锁的state已经是0表示没人持有了,tryAcquire()如果失败了,继续阻塞,如果成功,NextNode便成了头部,释放Thread属性,就是这个Thread拿到锁了,形成空Node,重构下双向队列关系就行了,然后return interrupted结束自旋退出,无论interrupted是true还是false,不管执行还是不执行selfInterrupt(),这acquire()方法返回void,这不重要,重要的是拿到锁,继续执行业务代码。就是说lock其实就是不响应中断的,要想响应中断,就用lockInterruptibly(); ,后续再聊lockInterruptibly();

public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt(); 
    }

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

所以解锁就是:每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。

公平锁

其实公平锁和非公平锁的区别在于:tryAcquire()方法,公平锁多了个 !hasQueuedPredecessors() ,其他的addWaiter、acquireQueued都是公用的。

公平锁:
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

非公平锁:
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

重点就一个:hasQueuedPredecessors()方法,就是看看队列还有无排队的,注意是取反的。重点是返回false,跳出外面为true,才去cas进行获取锁。

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  1. 首先h!=t:队头不等于队尾:
  • false(h==t):返回false后面就不用执行,直接跳出去是true,那就争锁。情况:
    ①没有初始化队列
    ②队列就剩下最后那个Node
    那就不需要排队,直接争锁完事。

  • true(h!=t)::队列已经有了。接 2

 ((s = h.next) == null || s.thread != Thread.currentThread());

s指第一个排队的Node

  1. s = h.next == null :
  • false( s = h.next != null): 队伍不止一个,只要后面(s.thread != Thread.currentThread())是false,就能去抢锁。
    ①:s.thread != Thread.currentThread() == false,这时来的就是第一个排队的thread,为什么能再来一次,首先说前提,锁的state===0是自由的,上面介绍acquireQueue()方法里waitStatus的原因进行2次自旋,判断是不是第一个排队,如果是就接着tryAcquire()就又来到这里了,返回false,总体返回false,外面就是true,就可以cas抢锁了。
    ②:s.thread != Thread.currentThread() == true,这时过来的NewThread显然不是第一个排队的,乖乖入队就行。

  • true( s = h.next == null): 队伍中只有一个在排队的,跟前面的(h!=t)矛盾了,这也进不来了,但这里判断并不是原子操作,就算返回true,外面false,继续park,但拿锁的Thread执行完也会unpark它,这时候再判断就又去抢锁,个人感觉也是OK的。

个人还想到一种情况,就是t1正在拿锁,t2进来拿不到就去初始化队列,但是初始化队列并不是原子操作,t3来了,t2还没初始化完,cpu时间片恰巧给了t3,t3经过上面的判断,也能抢锁,不需要入队,有可能t3就率先拿到锁了,并不是规定是t3必须在t2后面,个人感觉这样说得通,因为多线程存在多种可能性。

参考:
JUC AQS ReentrantLock源码分析(一)
AQS简介
AQS

相关文章

网友评论

    本文标题:ReentrantLock和AbstractQueuedSync

    本文链接:https://www.haomeiwen.com/subject/llzbcrtx.html