美文网首页JavaJava 程序员
ReentrantLock 源码分析

ReentrantLock 源码分析

作者: 马小莫QAQ | 来源:发表于2022-05-27 21:42 被阅读0次

    ReentrantLock 特征

    特点:

    1. 可重入
    2. 公平/非公平
    3. 可中断
    4. 支持条件等待
    5. 可设置锁超时

    常用 API

    使用例子:

    public class ReentrantLockTest {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        static class ClientThread extends Thread {
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + "开始尝试获取锁");
                lock.lock();
                try {
                    System.out.println(Thread.currentThread() + "成功获取锁");
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println(Thread.currentThread() + "完成释放锁");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ClientThread t1 = new ClientThread();
            ClientThread t2 = new ClientThread();
            ClientThread t3 = new ClientThread();
            t1.start();
            t2.start();
            t3.start();
    
            TimeUnit.SECONDS.sleep(10);
        }
    }
    

    源码分析

    获取锁

    如果我使用下面的代码进行获取锁:

    ReentrantLock lock = new ReentrantLock();
    lock.lock();
    lock.unlock();
    

    ReentrantLock 默认调用的就是非公平锁调用栈:

    • java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
    final void lock() {
        // 直接尝试加锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 如果获取锁失败进入 AQS acquire 逻辑
            acquire(1);
    }
    

    如果 compareAndSetState(0, 1)能够直接执行成功,那么将直接结束方法的执行。如果失败,那么就会调用acquire 方法如下:

    public final void acquire(int arg) {
        // tryAcquire(arg) 尝试获取锁
        // acquireQueued 获取锁失败进行等待队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    我们先看 tryAcquire方法:

    • java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

    他会直接调用到 nonfairTryAcquire非公平锁的加锁逻辑里面有两个逻辑:

    • 如果当前状态无锁,直接尝试加锁,加锁成功返回 true
    • 如果当前时锁重入,那么直接修改 AQS 锁状态共享变量值 state 等于 c + acquires, 加锁成功返回 ture
    • 如果都不满足,那么返回加锁失败返回 false
    // 非公平锁的逻辑
    // 如何理解插队, 这里的插队是当前队列中被唤醒的线程, 和当前加入的线程都可以被执行
    // 如果当前加入线程比队列中唤醒的线程先获取到锁, 就是插队现象
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 无锁状态, 尝试竞争
        if (c == 0) {
            if (compareAndSetState(0, acquires)) { //是否获取到锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 当前线程持有锁, state 计数 +1
        else if (current == getExclusiveOwnerThread()) { //判断是否是重入
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    如果 tryAcquire 调用完成后是获取锁成功 ****acquire方法执行结束,最后代表 lock 方法执行结束。

    获取锁失败进入同步队列

    如果获取锁失败,那么就会执行 acquire代码后面段 if 逻辑的执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    这里其实可以分为两个方法来看

    • addWaiter(Node.EXCLUSIVE)
    • acquireQueued(xxx, arg)

    按照执行顺序,我们先看 addWaiter(Node.EXCLUSIVE) 这里主要是入队的逻辑。
    addWaiter: java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

    private Node addWaiter(Node mode) {
        // 将当前线程转换为 AQS Node 节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 尝试直接加入到尾节点
        Node pred = tail;
        if (pred != null) {
            // 当前节点的前驱节点指向尾节点
            node.prev = pred;
            // cas 修改 tail 节点,如果成功返回 node 
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果失败,调用 enq
        enq(node);
        return node;
    }
    

    enq是将当前节点插入队列,必要的时候会进行初始化

    //将节点插入队列,必要时进行初始化。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果没有尾节点,那么需要进行初始化
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } 
            // 如果有尾节点/其实就是有头节点/已经被初始化,通过 CAS 入队
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    前面我们看看完了,当前获取锁的线程当获取锁失败的时候,成功进入 AQS 队列,接下来我们继续看
    acquireQueued又做了什么呢?

    • 如果是队列头节点,会再次尝试获取锁
    • 如果修改 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node状态位
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 是否中断
            boolean interrupted = false;
            for (;;) {
                // 获取 node 的前驱节点
                final Node p = node.predecessor();
                // 如果是头节点,再次尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 将 node 设置为 头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断是否需要进行阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 阻塞线程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 是否失败
            if (failed)
                // 如果失败,取消获取锁
                cancelAcquire(node);
        }
    }
    

    上面我们可以看到,for (;;)中有两个判断

    • 如果是头节点,就调用tryAcquire尝试获取锁 (之前我们已经分析过 tryAcquire 了,我们主要看后面个 if )
    • 如果不是就进入 shouldParkAfterFailedAcquire 方法

    在调用 acquireQueued这个过程中可能调用多次 shouldParkAfterFailedAcquire 方法。shouldParkAfterFailedAcquire 会执行一下几个操作。

    • 可以用来修改当前节点的状态,
    • 和对链表上无效的节点出队
    /** 
     * 当获取锁失败后, 检查更新新节点状态如果是需要阻塞返回, true
     * <p>
     * 一个前继节点 waitStatus = 0, 第一次将继续设置为 SIGNAL, 告诉当前线程准备进入阻塞, 此时依旧获取不到, 当前线程进入阻塞
     *
     * @param pred 前继节点
     * @param node 当前节点
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 前继节点的状态, 第一次进入的话, 一定是 0
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                // 出队, 剔除无效的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 第一次进来, pred.waitStatus = 0 执行这个分支
            // 将前继节点的状态修改为 SIGNAL, 表示 pred.next 节点需要被唤醒(此时准备进入阻塞, 但是还未被阻塞, 再次获取锁失败之后才会被阻塞)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    当 Node 被修改 Node.SIGNAL状态后,第一个 if 返回 true , 我们再次回到 acquireQueued 方法,就会执行 parkAndCheckInterrupt 方法,就是将当前的线程 park 然后返回当前线程的中断状态。

    private final boolean parkAndCheckInterrupt() {
        // 阻塞线程
        LockSupport.park(this);
        // 返回线程中断状态
        return Thread.interrupted();
    }
    

    注意:这里线程 park 过后,其实获取锁就结束了前半段的操作,完成同步队列的入队,并且进入等待。我们就需要等待解锁唤醒。

    释放锁

    释放锁的代码如下:

    lock.unlock();
    

    释放锁做了什么呢?

    • 释放当前锁的状态
    • 在 AQS 队列中去唤醒排队的头节点

    调用栈如下:
    java.util.concurrent.locks.ReentrantLock#unlock

    • java.util.concurrent.locks.AbstractQueuedSynchronizer#release

    我们可以从 release方法开始

    // 解锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 判断是否有需要唤醒的线程
            if (h != null && h.waitStatus != 0) //waitStatus 的值为 0, 只有当后继存在节点才会被设置为该值不为 0, 此时需要唤醒后继线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    释放锁,主要是调用 tryRelease, 首先就是考虑之前的重入问题,直接对 state 进行 -1 ,然后如果 c == 0表示当前线程不再持有锁,我们就可以修改 ownerThread == null . 这个时候,最后修改 state 为新值。

    // tryRelease 
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 判断是否是当前线程持有锁
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            // 如果 state == 0 表示当前线程不在占有该锁
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    释放锁成功后,再次回到 release方法,会再次判断,如果 AQS 队列不为空,那么就进行排队线程唤醒。
    主要是调用 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

    // 唤醒队列中的线程
    private void unparkSuccessor(Node node) {
        // 将当前节点状态修改为 0  
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 反向查找
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒队列中的节点
            LockSupport.unpark(s.thread);
    }
    

    其实这里最关键的就是 LockSupport.unpark(s.thread); 这里就会回到 acquireQueued,执行唤醒后强锁的逻辑,依然在 acquireQueued里面。

    释放锁后唤醒等待节点

    当前节点被唤醒逻辑,首先会在 shouldParkAfterFailedAcquire 方法中出队,然后尝试加锁如果加锁成功就返回 true.

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 出队的逻辑
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    再次竞争锁,主要是在acquireQueued方法中调用 tryAcquire方法进行获取锁。如果获取锁失败,就又再次获取锁,如果获取锁成功返回。

    测试和实践

    支持锁中断

    如果通过 lock.lockInterruptibly();_ 方式加锁,如果当前线程出现中断过后,会抛出 _java.lang.InterruptedException线程中断异常,所以 ReentrantLock支持可中断。
    相关源码:

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // LockSupport.park 会清除中断信号
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    
    // 
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    实验代码:

    public class ReentrantLockTest {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        static class ClientThread implements Runnable {
    
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + "开始尝试获取锁");
                lock.lockInterruptibly();
                try {
                    System.out.println(Thread.currentThread() + "成功获取锁");
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println(Thread.currentThread() + "完成释放锁");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(new ClientThread(), "t1");
            Thread t2 = new Thread(new ClientThread(), "t2");
            Thread t3 = new Thread(new ClientThread(), "t3");
            t1.start();
            t2.start();
            // 锁中断
            //lock.lockInterruptibly();
            TimeUnit.SECONDS.sleep(1);
            t3.start();
            TimeUnit.SECONDS.sleep(1);
            t3.interrupt();
    
    
    
            TimeUnit.SECONDS.sleep(10);
        }
    }
    

    输出结果:

    Thread[t1,5,main]开始尝试获取锁
    Thread[t2,5,main]开始尝试获取锁
    Thread[t1,5,main]成功获取锁
    Thread[t3,5,main]开始尝试获取锁
    Exception in thread "t3" java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at io.zhengsh.juc._1lock.reentrantlock.ReentrantLockTest$ClientThread.run(ReentrantLockTest.java:18)
        at java.lang.Thread.run(Thread.java:748)
    Thread[t1,5,main]完成释放锁
    Thread[t2,5,main]成功获取锁
    Thread[t2,5,main]完成释放锁
    

    获取锁设置超时

    lock.tryLock(2, TimeUnit.SECONDS)可以支持设置获取锁的超时时间,可以有效的避免线程饥饿问题

    测试代码:

    public class ReentrantLockTryTest {
        static ReentrantLock lock = new ReentrantLock(true);
    
        static class ClientThread implements Runnable {
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t开始尝试获取锁");
                try {
                    if (lock.tryLock(2, TimeUnit.SECONDS)) {
                        System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁成功");
                        TimeUnit.SECONDS.sleep(5);
                    } else {
                        System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t获取锁失败");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (lock.isHeldByCurrentThread() && lock.isLocked()) {
                        lock.unlock();
                        System.out.println(Thread.currentThread() + "\t" + (System.currentTimeMillis() / 1000) + "\t完成释放锁");
                    }
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(new ClientThread(), "t1");
            Thread t2 = new Thread(new ClientThread(), "t2");
            Thread t3 = new Thread(new ClientThread(), "t3");
            t1.start();
            t2.start();
            t3.start();
            //t1.interrupt();
    
            TimeUnit.SECONDS.sleep(20);
        }
    }
    

    输出结果:

    Thread[t1,5,main]   1653540581  开始尝试获取锁
    Thread[t3,5,main]   1653540581  开始尝试获取锁
    Thread[t2,5,main]   1653540581  开始尝试获取锁
    Thread[t1,5,main]   1653540581  获取锁成功
    Thread[t3,5,main]   1653540583  获取锁失败
    Thread[t2,5,main]   1653540583  获取锁失败
    Thread[t1,5,main]   1653540586  完成释放锁
    

    条件等待队列使用

    Condition 是在 java 1.5 中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。
    Condition 是个接口,基本的方法就是 await() 和 signal() 方法;
    Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()
    调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才可以使用:

    • Conditon中的await()对应Object的wait();
    • Condition中的signal()对应Object的notify();
    • Condition中的signalAll()对应Object的notifyAll()。

    测试场景:下面一个场景,需要ABC3个线程,A线程打印1次,然后是B线程打印2次,再是C线程打印3次,线程交替打印。
    ABC线程需要交替执行,我们需要控制,线程的执行先后顺序
    我们可以使用多条件Condition来控制,每一个线程拥有一个condition对象,调用各种的await方法,可以使线程等待,然后让别的线程调用这个condition对象的signal方法,唤醒线程。
    代码如下:

    public class ReentrantLockConditionTest {
    
        private int data = 1;
        private Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        Condition condition3 = lock.newCondition();
    
    
        public void printA() {
            lock.lock();
            try {
                while (data != 1) {
                    condition1.await();
                }
                // 打印5次
                for (int i = 0; i < 5; i++) {
                    System.out.println(Thread.currentThread().getName() + " ->" + data);
                }
                data = 2;
                // 通知B线程
                condition2.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void printB() {
            lock.lock();
            try {
                while (data != 2) {
                    condition2.await();
                }
                // 打印10次
                for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName() + " ->" + data);
                }
                data = 3;
                // 通知C
                condition3.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void printC() {
            lock.lock();
            try {
                while (data != 3) {
                    condition3.await();
                }
                // 打印15次
                for (int i = 0; i < 15; i++) {
                    System.out.println(Thread.currentThread().getName() + " ->" + data);
                }
                data = 1;
                // 通知A
                condition1.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ReentrantLockConditionTest conditionTest = new ReentrantLockConditionTest();
            // A,B,C 交替执行
            new Thread(conditionTest::printA, "A").start();
            new Thread(conditionTest::printB, "B").start();
            new Thread(conditionTest::printC, "C").start();
        }
    
    }
    

    输出结果如下:

    A ->1
    B ->2
    B ->2
    C ->3
    C ->3
    C ->3
    

    作者:心城以北
    链接:https://juejin.cn/post/7101956094137729031
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:ReentrantLock 源码分析

        本文链接:https://www.haomeiwen.com/subject/aqqvprtx.html