美文网首页
ReentrantLock之FairSync公平锁加锁过程

ReentrantLock之FairSync公平锁加锁过程

作者: 小超_f598 | 来源:发表于2019-08-01 12:00 被阅读0次

下面会通过一步步的场景进行分析ReentrantLock公平锁的源码,文章为比较长,做好准备。。。

首先ReentrantLock默认创建的就是非公平锁,所以要在ReentrantLock的构造函数中传入true,创建公平锁

ReentrantLock lock = new ReentrantLock(true);

lock.lock();

下面我们来一步步分析公平锁进行加锁 lock 的源码实现:

注意,会通过一个个线程进入lock的形式进行分析;

当线程T1调用 lock():注意T1是第一个去竞争锁的线程,后面的T2,T3。。默认都是在T1还没执行完的情况下去分析的;

public voidlock() {

    sync.lock();

}

继而进入FairSync里面的lock()方法:

final voidlock() {

    acquire(1);

}

接着进入acquire(1)方法,这里面才是重点:

public final voidacquire(intarg) {

    if(!tryAcquire(arg) &&

    acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

    selfInterrupt();

}

这个acquire(int arg)由四个方法构成:

tryAcquire:尝试获得锁

addWaiter:将当前线程封装成Node对象

acquireQueued:将当前线程表示的Node对象放到链表中进行排队,同时park当前线程;

selfInterrupt:相应线程中断;

代码块1 :分析acquire方法;

public final voidacquire(intarg) {

//首先会进入tryAcquire(arg)方法,看到这行注释,请把目光移到下面代码块2中。。。

//代码块2 返回了true,而!tryAcquire(arg) 将返回结果取了反,也就是true变为了false,所以该方法结束,当前线程T1已经占有锁;

    if(!tryAcquire(arg) &&

        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

            selfInterrupt();

}

代码块2 :

tryAcquire中的源码:

protected final boolean tryAcquire(int acquires) {

//current是当前来获取锁的这个线程,也就是我们说的T1线程

        final Thread current = Thread.currentThread();

    //state 表示锁状态,默认是0

//因为T1是第一个进来加锁的,所以这里getState()得到的结果肯定等于0,所以 c = 0

        int c = getState();

        if (c == 0) {

            //因为c=0,那么进入到这里

            //hasQueuedPredecessors:作用主要是判断当前AQS队列中有没有其他线程在排队,具体看下面详细说明,请将目光移到 代码块3

//通过分析代码块3,得知hasQueuedPredecessors会返回false,而!hasQueuedPredecessors()取了反,所以这里的结果变成了true,

            //接着进入了compareAndSetState,进行CAS操作,将state改变为1,表示占有锁 ,返回true

//compareAndSetState 返回了true ,进入到 setExclusiveOwnerThread,这里是将当前线程设置为占有锁线程,用来支持锁重入

            //结果返回true,将true带回到代码块1

            if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                setExclusiveOwnerThread(current);

                return true;

            }

        }

        else if (current == getExclusiveOwnerThread()) {

            int nextc = c + acquires;

            if (nextc < 0)

                throw new Error("Maximum lock count exceeded");

            setState(nextc);

            return true;

        }

        return false;

    }

}

代码块3:

hasQueuedPredecessors中的源码:

public final boolean hasQueuedPredecessors() {

  //因为T1是第一个进来加锁的线程,那么这里的tail = null , head = null,因为还没有线程去初始化他们,所以肯定是null;

    Node t = tail;

    Node h = head;

    Node s;

//h != t 相当于 head != tail 相当于 null != null,所以这里肯定返回的是false,下面的判断就不用再看了,先分析T1进来的情况

    return h != t &&

        ((s = h.next) == null || s.thread != Thread.currentThread());

}

然后返回false;带着false将目光回到代码块2

总结:

当第一个线程进来获得锁的时候,会将state由原来默认的0改为1,注意,此时并没有初始化链表,也就是说,如果线程都是交替执行,那么永远跟AQS队列无关;

假设 T1 还没有执行完,此时 T2 来了,也调用了 lock() ,T2 也要获得锁:

通过上面的分析,我们直接进入到这个方法:注意这里是T2线程,T1还没有执行完,锁还在T1那里

代码块1 :分析acquire方法;

public final voidacquire(intarg) {

    //首先会进入tryAcquire(arg)方法,看到这行注释,请把目光移到下面代码块2中。。。

    //代码块2 返回了false,而 !tryAcquire(arg) 将返回结果取了反,也就是false变为了true,这逻辑将进入到acquireQueued(addWaiter(Node.EXCLUSIVE),arg) ,

//首先会先进入到addWaiter(Node.EXCLUSIVE),请将目光转向代码块3

//addWaiter(Node.EXCLUSIVE)经过代码块3和代码块4的分析,这里返回了当前线程T2表示的node对象,带着这个返回的node进入到acquireQueued方法当中,带着node 和 1,将目光移到代码块5

    if(!tryAcquire(arg) &&

       acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

            selfInterrupt();

}

代码块2 :

tryAcquire中的源码:

protected final boolean tryAcquire(int acquires) {

    //当前线程,也就是T2线程

    final Thread current = Thread.currentThread();

    //因为T1线程还没有执行完,所以T1线程还持有锁,那么getState()返回的必然不是0,假设T1没有重入,那么getState()返回的是1 , c = 1 

    int c = getState();

    // 因为 c = 1 ,所以逻辑不会进入到这里

    if (c == 0) {

        if (!hasQueuedPredecessors() &&

                compareAndSetState(0, acquires)) {

            setExclusiveOwnerThread(current);

            return true;

        }

    }

// 因为 c = 1 所以来到这个判断,getExclusiveOwnerThread返回的当前持有锁的线程,也就是T1, current = T2,那么T2 == T1?当然不等于,返回false,带着false回到代码块1中

    else if (current == getExclusiveOwnerThread()) {

        int nextc = c + acquires;

        if (nextc < 0)

            throw new Error("Maximum lock count exceeded");

        setState(nextc);

        return true;

    }

    return false;

}

代码块3:

private Node addWaiter(Node mode) {    

    //将当前线程封装为Node对象,这个Node当中维护了当前线程的状态、前一个节点,后一个节点,当前线程的引用等等

    Node node = new Node(Thread.currentThread(), mode);    

    //因为T1获得锁的时候,并没有初始化AQS队列,所以head节点和tail节点都是null,所以pred = tail = null;

    Node pred = tail;

    // pred = null ,所以这个条件不成立,将进入下面的enq方法

    if (pred != null) {

        node.prev = pred;

        if (compareAndSetTail(pred, node)) {

            pred.next = node;

            return node;

        }

    }

    //当线程第一次初始化AQS队列的时候,也就是队列中还没有任何等待的线程,enq方法很重要,请将目光移到代码块4:

    enq(node);

    //代码块4执行完,返回了当前线程T2表示的node节点,返回node,带着node回到代码块1中

    return node;

}

代码块4:

private Node enq(final Node node) {

    //这里是个死循环

    for (;;) {

        //第一次进入循环,tail 肯定为 null ,因为队列中没有其他等待的线程,所以 t = tail = null

        //第二次进入循环,t = tail = head = new Node();

        Node t = tail;

//第一次进入循环,t = null,所以会进入到里面的逻辑

 //第二次进入循环,t = tail = head = new Node(),也就是不等于null,所以进入到else 逻辑;

        if (t == null) { // Must initialize

//第一次进入循环,当tail = null 的时候,说明队列还没有被初始化,也就是AQS还没有任何线程进来排队过,通过compareAndSetHead 为 head 创建一个空Node,哨兵节点

//第一次进入循环,同时tail = head = new Node(),接着进入下一次循环

            if (compareAndSetHead(new Node()))

                tail = head;

        } else {

            //第二次循环会进入到这里,因为第一次循环中创建了哨兵节点head

            // node.prev = t :当前节点的前驱节点指向 t = tail = head,也就是指向head节点

            node.prev = t;

            // 通过CAS将当前T2线程表示的节点node设置tail节点

            if (compareAndSetTail(t, node)) {

                //t.next = node ,t = head,所以将head节点的后继节点指向当前T2线程表示的节点

                t.next = node;

            //返回当前节点,带着当前线程T2表示的node,返回到代码块3

                return t;

            }

        }

    }

}

代码块5:

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        //这里是一个死循环

        for (;;) {

            //第一次循环,获得当前节点的前驱节点,也就是head节点

            //第二次循环,获得当前节点的前驱节点,也就是head节点

            final Node p = node.predecessor();

//第一次循环,p == head ? 当然等于,所以进入tryAcquire尝试获得锁,因为前面的处理过程中,可能T1已经执行完了,这里其实也相当于自旋,我们假设T1还没有执行完,这里没有获取到锁返回false

//第二次循环,P == head ? 当然等于,所以再次进入到tryAcquire中尝试获得锁,又一次自旋,假设还是没有获得到锁,因为T1还没有执行完,锁还没有被释放

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return interrupted;

            }

        //第一次循环,带着当前节点node的前驱节点head,和当前节点node,进入到shouldParkAfterFailedAcquire方法之中,这里很重要,请看代码块6

//第一次循环,从代码块6中的shouldParkAfterFailedAcquire返回了false,那么第一次循环结束

//第二次循环,带着当前节点node的前驱节点head,和当前节点node,再次进入到shouldParkAfterFailedAcquire方法之中,请看代码块6

//第二次循环,从代码块6中的shouldParkAfterFailedAcquire返回了true,代表着将进入parkAndCheckInterrupt方法将当前线程休眠,等待被唤醒,代码阻塞在里面,至此,T2进入睡眠;

            if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

代码块6:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    //外面第一次循环:pred = head , node 为 T2,因为T2刚刚才初始化了head节点,所以head节点中的waitStatus = 0;

    //外面第一次循环:ws = 0 ;

    //外面第二次循环:因为在第一次的时候,我们已经将head节点中waitStatus改为-1

    int ws = pred.waitStatus;

    //外面第一次循环:ws == Node.SIGNAL == -1?ws = 0,所以不会进入到这里面

    //外面第二次循环:ws == Node.SIGNAL == -1?ws = -1,所以会进入到这里面

    if (ws == Node.SIGNAL)

        /*

         * This node has already set status asking a release

         * to signal it, so it can safely park.

         */

        //返回true,带着这个true,回到代码块5中

        return true;

    if (ws > 0) {

        /*

         * Predecessor was cancelled. Skip over predecessors and

         * indicate retry.

         */

        do {

            node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

    } else {

//外面第一次循环:会来到这里,通过CAS将head节点中的waitStatus 改 -1,然后返回false,带着false回到代码块5

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

}

总结:T2在经过一次获取锁,两次自旋后还是没有获得到锁,初始化了head哨兵节点,并将head节点中的waitStatus = -1 ,T2进入了睡眠,等待唤醒;

此时,T1还是没有执行完,T3又来竞争锁:

代码块1:

public final void acquire(int arg) {

//还是通过tryAcquire尝试获得锁,这里不在分析tryAcquire,因为T1还没有被执行完,T2正在睡眠,tryAcquire还是没有获得到锁,返回false,则进入代码块2

//代码块2中,addWaiter成功将当前T3表示的node加入到链表尾部,并返回当前node

    //带着当前线程T3表示的node进入到acquireQueued,这里面其实就是自旋和当前是否休眠的操作,请看代码块3

    if (!tryAcquire(arg) &&

        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        selfInterrupt();

}

代码块2:

private Node addWaiter(Node mode) {    

    //将当前线程封装为Node对象,这个Node当中维护了当前线程的状态、前一个节点,后一个节点,当前线程的引用等等

    Node node = new Node(Thread.currentThread(), mode);    

    //因为T2刚刚已经初始化了AQS队列,并创建了head哨兵节点,同时将tail 设置了等于T2线程表示的node,所以tail = T2表示的node;

    Node pred = tail;

    // pred = T2node  ,所以这个条件成立,将进入方法

    if (pred != null) {

        //将当前节点的前驱节点指向T2表示的node

        node.prev = pred;

        //将tail表示的节点等于当前节点T3表示的node,相当于T3表示node是尾节点

        if (compareAndSetTail(pred, node)) {

            //pred = T2表示的node,所以这里是将T2表示的node的下一个节点指向当前T3表示的node,这就是一个双向链表的数据结构,即使后面T4,T5。。。也是这样链接起来

            pred.next = node;

// 返回当前node,带着node回到代码块1中

            return node;

        }

    }

    //当线程第一次初始化AQS队列的时候,也就是队列中还没有任何等待的线程,enq方法很重要,请将目光移到代码块4:

    enq(node);

    //代码块4执行完,返回了当前线程T2表示的node节点,返回node,带着node回到代码块1中

    return node;

}

代码块3:

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        //这里是一个死循环

        for (;;) {

            //第一次循环,获得当前节点的前驱节点,也就是T2表示的node节点

        //第二次循环,获得当前节点的前驱节点,也就是T2表示的node节点

            final Node p = node.predecessor();

            //第一次循环,p == head ? 当然不等于,因为现在当前节点的前驱节点是T2表示的node节点,直接返回了false,进入到下面的shouldParkAfterFailedAcquire当中,请看代码块4

            //第二次循环,P == head ? 当然不等于,所以再次进入到tryAcquire中尝试获得锁,又一次自旋,假设还是没有获得到锁,因为T1还没有执行完,锁还没有被释放

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return interrupted;

            }

            //第一次循环,带着当前节点node的前驱节点head,和当前节点node,进入到shouldParkAfterFailedAcquire方法之中,这里很重要,请看代码块4

            //第一次循环,从代码块4中的shouldParkAfterFailedAcquire返回了false,那么第一次循环结束

            //第二次循环,带着当前节点T3表示的node的前驱节点T2 node,和当前节点T3 node,再次进入到shouldParkAfterFailedAcquire方法之中,请看代码块4

            //第二次循环,从代码块4中的shouldParkAfterFailedAcquire返回了true,代表着将进入parkAndCheckInterrupt方法将当前线程休眠,等待被唤醒,代码阻塞在里面,至此,T3进入睡眠;

            if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

代码块4:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    //外面第一次循环:pred = T2表示的node , node 为 T2,因为T2刚刚将head节点的waitStatus = -1之后自己进入了睡眠,但是T2的waitStatus还是等于0;

    //外面第一次循环:ws = 0 ;

    //外面第二次循环:因为在第一次的时候,我们已经将T2 node节点中waitStatus改为-1

    int ws = pred.waitStatus;

    //外面第一次循环:ws == Node.SIGNAL == -1?ws = 0,所以不会进入到这里面

    //外面第二次循环:ws == Node.SIGNAL == -1?ws = -1,所以会进入到这里面

    if (ws == Node.SIGNAL)

        /*

         * This node has already set status asking a release

         * to signal it, so it can safely park.

         */

        //返回true,带着这个true,回到代码块3中

        return true;

    if (ws > 0) {

        /*

         * Predecessor was cancelled. Skip over predecessors and

         * indicate retry.

         */

        do {

            node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

    } else {

        //外面第一次循环:会来到这里,通过CAS将head节点中的waitStatus 改 -1,然后返回false,带着false回到代码块5中

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

}

总结:T3 和 T2的过程差不多,只是T2多了个初始化head节点的操作;

至此,当前AQS中,T1正在实行,T2 ,T3正在队列中睡眠,这就是当线程竞争锁不到时排队的情景;

下面我们来看看另外一个场景:

假设T2在获取不到锁的时候,准备加入队列中的时候,T1执行完了,释放了锁,这个时候是怎样的情况?

代码块1:

public final void acquire(int arg) {

//当T1未执行完时,T2线程来了,并且tryAcquire在尝试获得锁失败,在经过addWaiter封装成了node对象进入acquireQueued准备排队,我们来看看此时acquireQueued的情况

    if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        selfInterrupt();

}

代码块2:

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

        //由于在addWaiter中已经初始化了head节点,当前node排在head之后

            // p = head = node.predecessor()

            final Node p = node.predecessor();

        // p == head ? 明显是等于的,这个时候进入tryAcquire尝试获得锁,在这之前T1执行完了,所以这里T2获得到了锁,返回true,进入

            if (p == head && tryAcquire(arg)) {

                //将当前节点node设置为head节点,因为他已经获得锁了,不用在排队了,所以把head=null,让GC回收,当前节点node变为了head

                //setHead当中做了三个事情:1.将head等于当前node,2.将当前节点node的前驱节点=null,3.将当前节点的thread等于null

                setHead(node);

                //因为当前节点node已经变为了head,所以head的next也是等于空

                p.next = null; // help GC

                failed = false;

                return interrupted;

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

以上就是ReentrantLock之公平锁的加锁过程,后续我们会分析加锁过程

总结:

1.公平锁在调用lock的时候总共尝试获得锁几次?

答:

1.如果是第一个线程,则一次就获得成功

2.如果是第二个线程,第一个线程还没有执行完,那么会经过三次尝试获得锁,在得不到锁的情况进入休眠

相关文章

网友评论

      本文标题:ReentrantLock之FairSync公平锁加锁过程

      本文链接:https://www.haomeiwen.com/subject/xtdqdctx.html