美文网首页
ReentrantLock原理 之 lock & unlock

ReentrantLock原理 之 lock & unlock

作者: perseverance_li | 来源:发表于2020-12-08 18:07 被阅读0次

概述

以下讨论基于JDK1.8

ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。

CAS:Compare and Swap,比较并交换。CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。

AQS:AbstractQueuedSynchronizer,它是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。

AQS使用一个FIFO的队列表示排队等待锁的线程,基于这个队列ReentrantLock的基本实现原理可以概括为:先通过CAS尝试获得锁,如果此时已经有线程占据锁,那就加入队列的队尾且被挂起。当锁被释放之后,队列中第一个线程被唤起,然后通过CAS尝试获取锁。在这个时候,如果:

非公平锁:如果同时还有另外一个新线程进来,它可以不用排队直接来争抢锁,此时可能会被这个线程抢到锁。

公平锁:如果同时还有另外一个新线程进来,当它发现已经有人在队列中排队时,此线程就会排到队尾,由队首的线程获取锁。

所以非公平锁效率要高于公平锁,因为非公平锁减少了线程挂起唤醒的时间。

类继承关系

ReentrantLock.png

ReentrantLock特点

* 可重入
* 可中断
* 公平锁/非公平锁
* 使用CAS
* 相对于synchronized更为灵活

ReentrantLock构造方法

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

NonfairSync:非公平锁
FairSync:公平锁

NonfairSync

一. lock()

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

使用CAS操作判断state是否为0(0表示锁未被占用),如果是0则设置为1,并把当前线程设置为该锁的独占线程,即当前线程获得锁。
“非公平”即体现在这里,如果占用的线程释放锁,state置为0,而排队的线程还未被唤醒时,新来的线程可能直接抢占了该锁,那么就是插队成功。

假设当前有三个线程来竞争锁,A线程CAS操作成功获,拿到了锁,那么B C线程设置state失败,接下来会进入acquire方法:

二. acquire()

public final void acquire(int arg) {
    //尝试获取锁
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state状态
    int c = getState();
    if (c == 0) {//当前没有线程占用
        //使用CAS来这是state状态
        if (compareAndSetState(0, acquires)) {
            //占用锁成功,设置当前线程为独享
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//如果当前线程已经占用锁
        //将state + 1 ,即锁重入
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //更新state,记录重入次数
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;
}

总结:非公平锁tryAcquire流程:检查state为0未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若是被自己占用,增更新state记录重入次数。如果都没有成功,则获取锁失败,返回false。

三. addWaiter()

当前A线程已经获得锁,BC线程尝试获取锁失败后,就会进入等待队列,如果A线程长时间持有锁,BC线程会被挂起。

private Node addWaiter(Node mode) {
        //初始化节点,设置关联线程,模式为独享(mode可参考Node中各个模式)
        Node node = new Node(Thread.currentThread(), mode);
        //获取尾结点
        Node pred = tail;
        //如果尾结点不为空,说明队列已经存在
        if (pred != null) {
            node.prev = pred;
            //将新节点设置为尾结点 CAS方式
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //尾结点为空,说明队列还没有初始化,需要初始化head节点并且入队
        enq(node);
        return node;
    }

BC线程同时尝试入队,由于队列尚未初始化,所以至少有一个线程会走到enq(node)

    //初始化队列,并将新节点入队
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //如果队列为空,新建一个节点设置为head节点,并且将tail尾结点指向head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //如果队列不为空,将新节点入队
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

BC线程进入enq方法,此时队列为空,那么只有一个线程通过compareAndSetHead创建head节点成功,然后开始第二轮循环,第二轮循环此时tail节点已经不为空,两个线程会进入else,假设B线程通过compareAndSetTail成功,那么B线程入队并返回,C由于入队失败需要第三轮循环。最终所有线程入队成功。当BC线程入队后,此时AQS的队列如下:


AQS入队.jpg

四.acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取前置节点
            final Node p = node.predecessor();
            //如果前置节点是head,则当前节点在第二位,此时该节点可以尝试获取锁
            if (p == head && tryAcquire(arg)) {
                //当前节点获取锁成功,并将其设置为head节点
                setHead(node);
                //原head节点出队,等待被GC
                p.next = null; // help GC
                //获取锁成功
                failed = false;
                //返回线程是否被中断过
                return interrupted;
            }
            //判断线程获取锁失败后是否可以挂起?若可以才挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //设置线程被中断标志位为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

BC线程在获取锁时,如果A线程一直占用锁,则BC线程通过tryAcquire获取锁都会失败,因为会进入第二个if语句,判断线程是否可以挂起。

五.shouldParkAfterFailedAcquire()

这个方法的主要作用为判断当前线程获取锁失败后是否需要挂起。
查看这个方法时需要先了解Node中几个字段意义:

字段 类型 默认值 描述
SHARED Node new Node() 节点为共享模式
EXCLUSIVE Node null 节点为独占(排它)模式
CANCELLED int 1 节点超时或被中断时设置为取消状态,用来设置waitStatus
SIGNAL int -1 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面的节点,当后面的节点竞争时,会将前面的节点更新为SIGNAL,用来设置waitStatus
CONDITION int -2 标识当前节点处于等待中(通过条件进行等待的状态),用来设置waitStatus
PROPAGATE int -3 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去,用来设置waitStatus
waitStatus volatile int 0 等待状态,默认0,标识正常同步等待
prev volatile Node null 队列中的上一个节点
next volatile Node null 队列中的下一个节点
thread volatile Thread null 当前节点所对应的线程
nextWaiter Node null 指向下一个处于阻塞的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取前置节点的状态
    int ws = pred.waitStatus;
    //如果前置节点状态为SIGNAL,则返回true,当前线程可挂起
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
         //大于0肯定是取消状态,不会通知
         //就一直往前找,找到一个能通知自己的,挂在那个节点后面,方便自己休眠有人唤醒自己
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //将前置节点的状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //前置节点不是SIGNAL状态返回false
    //下次在进入该方法要么前置节点是SIGNAL状态,要么就是上面的CAS失败了,重新CAS更新状态后才返回true
    return false;
}

//通过LockSupport.park(this)挂起当前线程,重置返回当前线程中断状态
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程入队后是否挂起的前提是,它的前置节点状态为SIGNAL,这要能保证前置节点获取锁后能唤醒自己。所以shouldParkAfterFailedAcquire先判断当前节点的前置节点是否是SIGNAL状态,若符合返回true,然后调用parkAndCheckInterrupt将自己挂起。如果前置节点状态不符合,再看前置节点是否为CANCELLED,若是就向前遍历知道找到第一个符合要求的前置节点,最后的else是通过CAS的方式将前置节点状态设置为SIGNAL。

此时Node队列的状态可能如下:


AQS队列节点挂起状态.jpg

六. unlock()

public void unlock() {
    //每次unlock将state递减1
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //释放锁完成后,获取head节点,并head节点waitStatus不为0
        if (h != null && h.waitStatus != 0)
            //唤醒下一个线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    //获取当前线程state数量,并将其减1
    int c = getState() - releases;
    //如果当前线程不是独占线程则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果-1之后state为0
    if (c == 0) {
        //free设置为true
        free = true;
        //清空当前的独占线程,此时已无线程使用
        setExclusiveOwnerThread(null);
    }
    //如果之前的锁有重入,则重新记录重入次数
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        //如果节点状态<0,是否为SIGNAL状态
        //通过CAS重置状态
        compareAndSetWaitStatus(node, ws, 0);
    //获取下一个节点
    Node s = node.next;
    //如果下一个节点为空,或者状态为取消状态
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从尾结点开始遍历
        for (Node t = tail; t != null && t != node; t = t.prev)
            //找到最前面且状态为SIGNAL的节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //唤醒这个节点
        LockSupport.unpark(s.thread);
}

总结:流程为先尝试解锁,若解锁成功,那么查看Head节点状态是否为SIGNAL,如果是则唤醒头结点的下一个节点关联的线程,如果释放失败则返回false表示解锁失败。所以每次解锁成功后唤醒的都是Head节点的下一个节点关联的线程。

FairSync

公平锁的上锁过程与非公平锁存在一些不同点,但整体流程大致是一样的,下面介绍一下区别:

一. lock()

final void lock() {
    acquire(1);
}

相对于非公平锁来说lock方法直接调用了acquire()方法,而非公平锁新进来的线程有机会通过compareAndSetState抢占说,这个是公平与非公平的体现。

二.tryAcquire()

tryAcquire相对于非公平锁来说多了一个hasQueuedPredecessors方法,这个方法的意思是,当前队列如果已经有线程在排队了,则当前线程也需要去排队。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //如果当前队列已经有线程排队,hasQueuedPredecessors返回true,则当前线程tryAcquire失败,进入后续排队代码
        //如果hasQueuedPredecessors返回false,则当前线程通过CAS方式尝试获取锁,如果成功则返回true,并设置当前线程为独占线程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

相关文章

网友评论

      本文标题:ReentrantLock原理 之 lock & unlock

      本文链接:https://www.haomeiwen.com/subject/izgbwktx.html