美文网首页程序员@IT·互联网
ReentrantLock的使用及底层实现原理

ReentrantLock的使用及底层实现原理

作者: 我可能是个假开发 | 来源:发表于2023-12-06 11:30 被阅读0次

    一、基本介绍

    语法:

    // 获取锁
    reentrantLock.lock();
    try {
      // 临界区
    } finally {
      // 释放锁
      reentrantLock.unlock();
    }
    

    特点:

    • 可重入
    • 可中断
    • 可以设置超时时间
    • 可以设置为公平锁
    • 支持多个条件变量(类似Synchronized中的waitset,拿不到锁时进入的waitset中等待,可以支持多个waitset)

    1.可重入

    同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁;如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

    @Slf4j
    public class ReentrantTest {
        static ReentrantLock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            method1();
        }
    
        public static void method1() {
            lock.lock();
            try {
                log.debug("execute method1");
                method2();
            } finally {
                lock.unlock();
            }
        }
    
        public static void method2() {
            lock.lock();
            try {
                log.debug("execute method2");
                method3();
            } finally {
                lock.unlock();
            }
        }
    
        public static void method3() {
            lock.lock();
            try {
                log.debug("execute method3");
            } finally {
                lock.unlock();
            }
        }
    }
    
    20:22:29.392 [main] DEBUG juc.reentrant.ReentrantTest - execute method1
    20:22:29.396 [main] DEBUG juc.reentrant.ReentrantTest - execute method2
    20:22:29.396 [main] DEBUG juc.reentrant.ReentrantTest - execute method3
    

    实现源码:

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
                else if (current == getExclusiveOwnerThread()) {
                    // state++
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    protected final boolean tryRelease(int releases) {
                // state--
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 支持锁重入, 只有 state 减为 0, 才释放成功
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    2.可打断

    在等待锁的过程中,其他线程可以使用interrupt方法中止当前线程的等待。
    可以防止死锁的发生,线程不用一直等下去。

    @Slf4j
    public class InterruptTest {
    
        private static ReentrantLock reentrantLock = new ReentrantLock();
    
        public static void main(String[] args) throws InterruptedException {
            Thread t1 = new Thread(()->{
                try {
                    //竞争锁失败进入阻塞队列,可以被其他线程调用interrupt方法打断
                    log.debug("t1线程开始抢锁");
                    reentrantLock.lockInterruptibly();
                } catch (InterruptedException e) {
                    //抢锁失败进入了阻塞队列然后被打断了就进入这个catch块
                    e.printStackTrace();
                    log.debug("t1线程被打断,返回");
                    return;
                }
                try{
                    log.debug("t1线程抢锁成功");
                }finally {
                    reentrantLock.unlock();
                }
            },"t1");
            //主线程先抢锁
            log.debug("main线程开始抢锁");
            reentrantLock.lock();
            log.debug("main线程抢锁成功");
            //t1线程启动开始抢锁,抢锁失败
            t1.start();
            Thread.sleep(1000);
            log.debug("main线程打断正在阻塞的t1线程");
            t1.interrupt();
        }
    }
    
    10:09:47.246 [main] DEBUG juc.reentrant.InterruptTest - main线程开始抢锁
    10:09:47.248 [main] DEBUG juc.reentrant.InterruptTest - main线程抢锁成功
    10:09:47.249 [t1] DEBUG juc.reentrant.InterruptTest - t1线程开始抢锁
    10:09:48.250 [main] DEBUG juc.reentrant.InterruptTest - main线程打断正在阻塞的t1线程
    10:09:48.251 [t1] DEBUG juc.reentrant.InterruptTest - t1线程被打断,返回
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at juc.reentrant.InterruptTest.lambda$main$0(InterruptTest.java:25)
        at java.lang.Thread.run(Thread.java:748)
    

    进入synchronized临界区的代码可以被打断,lockInterruptibly是获取锁失败被阻塞的过程可以被打断。

    不可打断模式:
    在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

        private final boolean parkAndCheckInterrupt() {
            // 如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        }
    
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 需要获得锁后, 才能返回打断状态
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                // 如果打断状态为 true
                selfInterrupt();
        }
    
        static void selfInterrupt() {
            // 重新产生一次中断
            Thread.currentThread().interrupt();
        }
    

    可打断模式:

        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 如果没有获得到锁, 进入 ㈠
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
    // ㈠ 可打断的获取锁流程
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    // 在 park 过程中如果被 interrupt 会进入此,这时候抛出异常, 不会再次进入 for (;;)
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    3.设置超时时间

    @Slf4j
    public class TimeoutTest {
        public static void main(String[] args) {
            test1();
        }
    
        private static void test1() {
            ReentrantLock lock = new ReentrantLock();
            Thread t1 = new Thread(() -> {
                log.debug("t1线程启动...");
                try {
                    //获取不到会等待timeout时间,如果再抢不到就返回
                    if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                        log.debug("尝试获取锁,等待 1s 后,失败,返回");
                        return;
                    }
                } catch (InterruptedException e) {
                    log.debug("t1线程被打断,获取锁失败",e);
                    return;
                }
                try {
                    log.debug("t1抢锁成功");
                } finally {
                    lock.unlock();
                }
            }, "t1");
    
            lock.lock();
            log.debug("main抢锁成功");
            t1.start();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        private static void test2() {
            ReentrantLock lock = new ReentrantLock();
            Thread t1 = new Thread(() -> {
                log.debug("t1线程启动...");
                //获取不到不会等待
                if (!lock.tryLock()) {
                    log.debug("t1线程抢锁失败,返回");
                    return;
                }
                try {
                    log.debug("获得了锁");
                } finally {
                    lock.unlock();
                }
            }, "t1");
    
            lock.lock();
            log.debug("main线程抢锁成功");
            t1.start();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    使用锁超时解决哲学家就餐问题

    public class DealPhilosopherMeal {
    
        public static void main(String[] args) {
            Chopstick c1 = new Chopstick("1");
            Chopstick c2 = new Chopstick("2");
            Chopstick c3 = new Chopstick("3");
            Chopstick c4 = new Chopstick("4");
            Chopstick c5 = new Chopstick("5");
            new Philosopher("苏格拉底", c1, c2).start();
            new Philosopher("柏拉图", c2, c3).start();
            new Philosopher("亚里士多德", c3, c4).start();
            new Philosopher("赫拉克利特", c4, c5).start();
            new Philosopher("阿基米德", c5, c1).start();
        }
    
    }
    
    /**
     * 哲学家类
     */
    @Slf4j
    class Philosopher extends Thread {
        Chopstick left;
        Chopstick right;
    
        public Philosopher(String name, Chopstick left, Chopstick right) {
            super(name);
            this.left = left;
            this.right = right;
        }
    
        @Override
        public void run() {
            while (true) {
                // 尝试获得左手筷子
                if(left.tryLock()){
                    try {
                        // 尝试获得右手筷子
                        if (right.tryLock()) {
                            try{
                                eat();
                            }finally {
                                right.unlock();
                            }
                        }
                    }finally {
                        left.unlock();
                    }
                }
    
    //            // 尝试获得左手筷子
    //            synchronized (left) {
    //                // 尝试获得右手筷子
    //                synchronized (right) {
    //                    eat();
    //                }
    //            }
            }
        }
    
        Random random = new Random();
        private void eat(){
            log.debug("eating...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 筷子类
     */
    class Chopstick extends ReentrantLock {
        String name;
    
        public Chopstick(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "筷子{" + name + '}';
        }
    }
    
    11:21:48.096 [柏拉图] DEBUG juc.reentrant.Philosopher - eating...
    11:21:48.097 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
    11:21:49.104 [亚里士多德] DEBUG juc.reentrant.Philosopher - eating...
    11:21:49.104 [阿基米德] DEBUG juc.reentrant.Philosopher - eating...
    11:21:50.106 [柏拉图] DEBUG juc.reentrant.Philosopher - eating...
    11:21:50.106 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
    11:21:51.107 [苏格拉底] DEBUG juc.reentrant.Philosopher - eating...
    11:21:51.107 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
    11:21:52.110 [苏格拉底] DEBUG juc.reentrant.Philosopher - eating...
    ···
    

    4.设置为公平锁

        public ReentrantLock() {
            sync = new NonfairSync();
        }
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    默认为非公平锁,通过传递一个布尔参数来实现公平。

    • 好处:可以解决饥饿问题
    • 坏处:降低了并发度

    实现原理:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    // 与非公平锁主要区别在于 tryAcquire 方法的实现
    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
        // (一)
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            // h != t 时表示队列中有 Node
            return h != t &&
                // (s = h.next) == null 表示队列中还有没有老二或者队列中老二线程不是此线程
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    5.支持多个条件变量

    synchronized 中的条件变量,就是waitSet ,当条件不满足时进入 waitSet 等待,状态为waiting。
    ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量,synchronized 是那些不满足条件的线程都在一个集合等消息,而 ReentrantLock 支持多个集合,唤醒时也是按集合来唤醒。
    每个条件变量对应着一个等待队列,其实现类是 ConditionObject

    使用要点:

    • await 前需要获得锁
    • await 执行后,会释放锁,进入 conditionObject 等待
    • await 的线程被唤醒(或打断、或超时)去重新竞争 lock 锁
    • 竞争 lock 锁成功后,从 await 后继续执行
      基本使用:
    public class TestAwait {
        static ReentrantLock reentrantLock = new ReentrantLock();
        public static void main(String[] args) throws InterruptedException {
            Condition condition1 = reentrantLock.newCondition();
            Condition condition2 = reentrantLock.newCondition();
            reentrantLock.lock();
            //进入条件变量中等待
            condition1.await();
            condition1.signalAll();
        }
    }
    
    @Slf4j
    public class TestAwaitSignal {
    
        static boolean hasEat = false;
        static boolean hasDrink = false;
    
        static ReentrantLock room = new ReentrantLock();
        //等喝的waitSet
        static Condition drinkWaitSet = room.newCondition();
        //等吃的waitSet
        static Condition eatWaitSet = room.newCondition();
    
        public static void main(String[] args) throws InterruptedException {
    
            new Thread(() -> {
                room.lock();
                try {
                    log.debug("有吃的没?[{}]", hasEat);
                    while (!hasEat) {
                        log.debug("没吃的,先歇会!");
                        try {
                            eatWaitSet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (hasEat) {
                        log.debug("有吃的,可以开始干活了");
                    } else {
                        log.debug("没干成活...");
                    }
                }finally {
                    room.unlock();
                }
    
            }, "小红").start();
    
            new Thread(() -> {
                room.lock();
                try {
                    log.debug("有喝的没?[{}]", hasDrink);
                    while (!hasDrink) {
                        log.debug("没喝的,先歇会!");
                        try {
                            drinkWaitSet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    if (hasDrink) {
                        log.debug("有喝的,可以开始干活了");
                    } else {
                        log.debug("没干成活...");
                    }
                }finally {
                    room.unlock();
                }
            }, "小明").start();
    
            Thread.sleep(1000);
    
            new Thread(() -> {
                room.lock();
                try {
                    hasDrink = true;
                    log.debug("喝的送到");
                    drinkWaitSet.signal();
                } finally {
                    room.unlock();
                }
            }, "送喝的").start();
    
            new Thread(() -> {
                room.lock();
                try {
                    hasEat = true;
                    log.debug("吃的送到");
                    eatWaitSet.signal();
                } finally {
                    room.unlock();
                }
            }, "送吃的").start();
    
        }
    }
    
    14:53:18.641 [小红] DEBUG juc.reentrant.TestAwaitSignal - 有吃的没?[false]
    14:53:18.644 [小红] DEBUG juc.reentrant.TestAwaitSignal - 没吃的,先歇会!
    14:53:18.644 [小明] DEBUG juc.reentrant.TestAwaitSignal - 有喝的没?[false]
    14:53:18.644 [小明] DEBUG juc.reentrant.TestAwaitSignal - 没喝的,先歇会!
    14:53:19.645 [送喝的] DEBUG juc.reentrant.TestAwaitSignal - 喝的送到
    14:53:19.645 [小明] DEBUG juc.reentrant.TestAwaitSignal - 有喝的,可以开始干活了
    14:53:19.646 [送吃的] DEBUG juc.reentrant.TestAwaitSignal - 吃的送到
    14:53:19.646 [小红] DEBUG juc.reentrant.TestAwaitSignal - 有吃的,可以开始干活了
    

    await流程
    1.开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的addConditionWaiter 流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

    1.png

    2.进入 AQS 的 fullyRelease 流程,释放同步器上的锁


    2.png

    3.unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功


    3.png

    4.park 阻塞 Thread-0


    4.png

    signal 流程
    1.Thread-1 要来唤醒 Thread-0

    image.png
    2.进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即Thread-0 所在 Node
    image.png

    3.执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的waitStatus 改为 0,Thread-3 的waitStatus 改为 -1


    image.png

    4.Thread-1 释放锁,进入 unlock 流程

    二、固定顺序运行

    让t2线程先执行:

    1.使用wait/notify

    @Slf4j
    public class OrderRunTest {
        static final Object lock = new Object();
        // 表示 t2 是否运行过
        static boolean t2RunFlag = false;
    
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                synchronized (lock) {
                    while (!t2RunFlag) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("t1执行");
                }
            }, "t1");
    
            Thread t2 = new Thread(() -> {
                synchronized (lock) {
                    log.debug("t2执行");
                    t2RunFlag = true;
                    lock.notify();
                }
            }, "t2");
    
            t1.start();
            t2.start();
        }
    }
    
    15:26:05.771 [t2] DEBUG juc.reentrant.OrderRunTest - t2执行
    15:26:05.775 [t1] DEBUG juc.reentrant.OrderRunTest - t1执行
    

    2.使用await/signal

    @Slf4j
    public class OrderRunTest2 {
        static ReentrantLock reentrantLock = new ReentrantLock();
        static Condition waitSet = reentrantLock.newCondition();
        // 表示 t2 是否运行过
        static boolean t2RunFlag = false;
    
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                reentrantLock.lock();
                try {
                    while (!t2RunFlag) {
                        try {
                            waitSet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("t1执行");
                }finally {
                    reentrantLock.unlock();
                }
            }, "t1");
    
            Thread t2 = new Thread(() -> {
                reentrantLock.lock();
                try {
                    log.debug("t2执行");
                    t2RunFlag = true;
                    waitSet.signal();
                }finally {
                    reentrantLock.unlock();
                }
            }, "t2");
    
            t1.start();
            t2.start();
        }
    }
    

    3.使用park/unpark

    public class Test {
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {
                LockSupport.park();
                log.debug("1");
            }, "t1");
            t1.start();
    
            new Thread(() -> {
                log.debug("2");
                LockSupport.unpark(t1);
            },"t2").start();
        }
    }
    

    三、交替运行

    1.使用wait/notify

    @Slf4j
    public class RotateTest {
        public static void main(String[] args) {
            WaitNotify wn = new WaitNotify(1, 5);
            new Thread(() -> wn.print("a", 1, 2)).start();
            new Thread(() -> wn.print("b", 2, 3)).start();
            new Thread(() -> wn.print("c", 3, 1)).start();
        }
    }
    
    /*
    输出内容       等待标记     下一个标记
       a           1             2
       b           2             3
       c           3             1
     */
    @Slf4j
    class WaitNotify {
        // 打印               a           1             2
        public void print(String str, int waitFlag, int nextFlag) {
            for (int i = 0; i < loopNumber; i++) {
                synchronized (this) {
                    while(flag != waitFlag) {
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.print(str);
                    flag = nextFlag;
                    this.notifyAll();
                }
            }
        }
    
        // 等待标记
        private int flag; // 2
        // 循环次数
        private int loopNumber;
    
        public WaitNotify(int flag, int loopNumber) {
            this.flag = flag;
            this.loopNumber = loopNumber;
        }
    }
    
    abcabcabcabcabc
    

    2.使用await/signal

    public class RotateTest1 {
        public static void main(String[] args) throws InterruptedException {
    
            AwaitSignal awaitSignal = new AwaitSignal(5);
            Condition a = awaitSignal.newCondition();
            Condition b = awaitSignal.newCondition();
            Condition c = awaitSignal.newCondition();
    
            new Thread(() -> awaitSignal.print("a", a, b), "t1").start();
            new Thread(() -> awaitSignal.print("b", b, c), "t2").start();
            new Thread(() -> awaitSignal.print("c", c, a), "t3").start();
    
            Thread.sleep(1000);
            awaitSignal.lock();
            try {
                a.signal();
            } finally {
                awaitSignal.unlock();
            }
        }
    }
    
    class AwaitSignal extends ReentrantLock{
    
        // 循环次数
        private int loopNumber;
    
        public AwaitSignal(int loopNumber){
            this.loopNumber = loopNumber;
        }
    
        /**
         * 打印
         * @param str 打印的字符
         * @param current 当前条件变量
         * @param next 下一个条件变量
         */
        public void print(String str,Condition current,Condition next){
            for (int i = 0; i < loopNumber; i++) {
                lock();
                try{
                    try {
                        current.await();
                        System.out.print(str);
                        next.signal();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }finally {
                    unlock();
                }
            }
        }
    }
    

    3.使用park/unpark

    @Slf4j
    public class RotateTest2 {
    
        static Thread t1;
        static Thread t2;
        static Thread t3;
        public static void main(String[] args) {
            ParkUnpark pu = new ParkUnpark(5);
            t1 = new Thread(() -> pu.print("a", t2));
            t2 = new Thread(() -> pu.print("b", t3));
            t3 = new Thread(() -> pu.print("c", t1));
            t1.start();
            t2.start();
            t3.start();
    
            LockSupport.unpark(t1);
        }
    }
    
    class ParkUnpark {
        public void print(String str, Thread next) {
            for (int i = 0; i < loopNumber; i++) {
                LockSupport.park();
                System.out.print(str);
                LockSupport.unpark(next);
            }
        }
    
        private int loopNumber;
    
        public ParkUnpark(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    }
    

    四、非公平锁实现原理

    构造器默认为非公平锁实现:

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    

    NonfairSync 继承自 AQS
    没有竞争时:

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    image.png

    3.Thread-1 执行了:

    • 1.CAS 尝试将 state 由 0 改为 1,结果失败
    • 2.进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
    • 3.进入 addWaiter 逻辑,构造 Node 队列
      黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
      Node 的创建是懒惰的
      第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    3.png

    4.当前线程进入 acquireQueued 逻辑:

    • 1.acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
    • 2.如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,这时 state 仍为 1,失败
    • 3.进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的waitStatus 改为 -1(需要唤醒后继节点),返回 false
    • 4.shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
    • 5.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
    • 6.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
    final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final AbstractQueuedSynchronizer.Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    4.png

    5.再次有多个线程经历上述过程竞争失败:


    image.png

    Thread-0 释放锁,进入 tryRelease 流程:

    • 设置 exclusiveOwnerThread 为 null
    • state = 0


      image.png
    • 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
    • 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

    回到 Thread-1 的 acquireQueued 流程:

    • 如果加锁成功(没有竞争),会设置
      1.exclusiveOwnerThread 为 Thread-1,state = 1
      2.head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
      3.原本的 head 因为从链表断开,而可被垃圾回收


      image.png

    如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了:


    image.png

    如果被 Thread-4 先占了:

    • Thread-4 被设置为 exclusiveOwnerThread,state = 1
    • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

    是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定

    加锁源码:

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                //首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 如果尝试失败,进入 ㈠
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
        // ㈠ AQS 继承过来的方法
        public final void acquire(int arg) {
            // ㈡ tryAcquire
            if (!tryAcquire(arg) &&
                // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
            // ㈡ 进入 ㈢
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
          // ㈢ Sync 继承过来的方法
          final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                // 如果还没有获得锁
                if (c == 0) {
                    // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
                else if (current == getExclusiveOwnerThread()) {
                    // state++
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                // 获取失败, 回到调用处
                return false;
            }
    
        // ㈣ AQS 继承过来的方法
        private Node addWaiter(Node mode) {
            // 将当前线程关联到一个 Node 对象上, 模式为独占模式
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    // 双向链表
                    pred.next = node;
                    return node;
                }
            }
            // 尝试将 Node 加入 AQS, 进入 ㈥
            enq(node);
            return node;
        }
    
        // ㈥ AQS 继承过来的方法
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // cas 尝试将 Node 对象加入 AQS 队列尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
        // ㈤ AQS 继承过来的方法
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                    if (p == head && tryAcquire(arg)) {
                        // 获取成功, 设置自己(当前线程对应的 node)为 head
                        setHead(node);
                        // 上一个节点 help GC
                        p.next = null; // help GC
                        failed = false;
                        // 返回中断标记 false
                        return interrupted;
                    }
                    // 判断是否应当 park, 进入 ㈦
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // ㈦ AQS 继承过来的方法
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取上一个节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                // 上一个节点在阻塞, 那么自己也阻塞
                return true;
            // > 0 表示取消状态
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                // 这次还没有阻塞
                // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        // ㈧ 阻塞当前线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    解锁源码:

        public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            // 尝试释放锁, 进入 ㈠
            if (tryRelease(arg)) {
                // 队列头节点 unpark
                Node h = head;
                // 队列不为 null ;waitStatus == Node.SIGNAL 才需要 unpark
                if (h != null && h.waitStatus != 0)
                    // unpark AQS 中等待的线程, 进入 ㈡
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
            // ㈠
            protected final boolean tryRelease(int releases) {
                // state--
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 支持锁重入, 只有 state 减为 0, 才释放成功
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
        // (二)
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            // 如果状态为 Node.SIGNAL 尝试重置状态为 0 不成功也可以
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
            Node s = node.next;
            // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    相关文章

      网友评论

        本文标题:ReentrantLock的使用及底层实现原理

        本文链接:https://www.haomeiwen.com/subject/opdxgdtx.html