美文网首页Java基础
并发编程(六)ReentrantlLock实现原理

并发编程(六)ReentrantlLock实现原理

作者: Timmy_zzh | 来源:发表于2021-01-12 21:33 被阅读0次
    AQS
    • AQS全称是Abstract Queued Synchronizer,翻译为同步器,它是一套实现多线程同步功能的框架。AQS在源码中被广泛使用,特别是在java高并发编程中,比如ReentrantLock,Semaphore,CountDownLatch和ThreadPoolExecutor,在实际开发中也可以通过自定义AQS来实现各种需求场景。

    • ReentrantLock的同步功能的原理就是通过AQS来实现的

    1.ReentrantLock的加锁方法lock()

    public class ReentrantLock implements Lock, Serializable {
        
        private final ReentrantLock.Sync sync;
    
        public ReentrantLock() {
            this.sync = new ReentrantLock.NonfairSync();
        }
    
        public ReentrantLock(boolean fair) {
            this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
        }
    
        public void lock() {
            this.sync.lock();
        }
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
    
            abstract void lock();
    
            final boolean nonfairTryAcquire(int acquire) {}
    
            protected final boolean tryRelease(int release) {}
    
            protected final boolean isHeldExclusively() {}
    
            final ConditionObject newCondition() {}
    
            final Thread getOwner() {}
    
            final int getHoldCount() {}
    
            final boolean isLocked() {}
    
            private void readObject(ObjectInputStream var1) {}
        }
    }
    
    • 解析
      • ReentrantLock的lock方法调用的是sync的lock方法,sync是ReentrantLock的变量Sync的对象实例
      • Sync是ReentrantLock的静态内部类,该类继承自AbstractQueuedSynchronizer
      • Sync类有有两个实现类-NonfairSync 和 FairSync,这两个类也是ReentrantLock的静态内部类,分别对应非公平锁和公平锁。
        • 这两个类通过ReentrantLock的带参构造函数确定实现,默认sync为非公平锁NonfairSync实现
    NonfairSync实现源码如下:
        static final class NonfairSync extends ReentrantLock.Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            NonfairSync() {
            }
    
            final void lock() {
                //1.通过cas操作来修改state状态,表示争抢锁的操作
                if (this.compareAndSetState(0, 1)) {
                    //2.修改状态成功,则设置当前获得锁状态的线程为当前执行线程
                    this.setExclusiveOwnerThread(Thread.currentThread());
                } else {
                    //3.修改状态失败,调用AQS的acquire方法进行后续处理
                    this.acquire(1);
                }
            }
    
            protected final boolean tryAcquire(int var1) {
                return this.nonfairTryAcquire(var1);
            }
        }
    
    • 解析:在非公平锁NonfairSync的lock方法中,主要操作如下
      • 调用compareAndSetState方法,通过cas设置变量state(同步状态)成功,表示当前线程获取锁成功,则将当前线程设置为独占线程。
      • 如果通过cas设置变量state成功,表示当前锁正在被其他线程持有,则调用AQS的acquire方法进行后续操作
    AbstractQueuedSynchronizer的acquire方法
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
        
        private transient volatile AbstractQueuedSynchronizer.Node head;
        private transient volatile AbstractQueuedSynchronizer.Node tail;
        private volatile int state;
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        
        protected AbstractQueuedSynchronizer() {
        }
    
        protected final int getState() {
            return this.state;
        }
    
        protected final void setState(int var1) {
            this.state = var1;
        }
    
        protected final boolean compareAndSetState(int var1, int var2) {
            return unsafe.compareAndSwapInt(this, stateOffset, var1, var2);
        }
        
        public final void acquire(int var1) {
            if (!this.tryAcquire(var1) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), var1)) {
                selfInterrupt();
            }
        }
        
        protected boolean tryAcquire(int var1) {
            throw new UnsupportedOperationException();
        }
    }
    
    • 解析:AQS的acquire方法比较重要,该方法中主要有3个重要步骤
      • tryAcquire 方法主要目的是尝试获取锁
        • AQS中的tryAcquire 比较特殊,默认情况下会直接抛异常,因此需要在子类复写,所以重点看非公平锁NonfairSync的tryAcquire 的代码实现
      • 如果当前线程调用tryAcquire 获取锁失败,则会调用addWaiter()方法,该方法的作用是将当前线程添加到一个等待队列中,并返回新添加队列的节点Node
      • 接着会调用acquireQueued 方法,将加入到队列中的节点,通过自旋去尝试获取锁,根据情况将线程挂起或者取消。
    NonfairSync的tryAcquire方法
    • NonfairSync的tryAcquire会调用Sync的nonfairTryAcquire方法
        static final class NonfairSync extends ReentrantLock.Sync {
            
            protected final boolean tryAcquire(int acquires) {
                return this.nonfairTryAcquire(acquires);
            }
        }
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
    
            abstract void lock();
    
            final boolean nonfairTryAcquire(int acquires) {
                //获取当前执行的线程
                Thread current = Thread.currentThread();
                //获得state的值
                int state = this.getState();
                // state为0,表示当前无锁状态
                if (state == 0) {
                    //无锁状态下,通过cas操作将state的值设置为1,设置当前线程持有锁
                    if (this.compareAndSetState(0, acquires)) {
                        this.setExclusiveOwnerThread(current);
                        return true;
                    }
                } 
                // 如果state不为0,说明当前已有线程持有锁,如果当前持有锁的线程就是当前线程
                // 则增加锁的重入次数,将state加1并重新设置
                else if (current == this.getExclusiveOwnerThread()) {
                    int newState = state + acquires;
                    if (newState < 0) {
                        throw new Error("Maximum lock count exceeded");
                    }
                    this.setState(newState);
                    return true;
                }
                return false;
            }
        }
    
    • 解析:tryAcquire方法的作用是尝试获取锁,并返回获取锁的结果是否成功
      • AQS中有一个变量state,表示当前锁的状态
      • 如果state为0表示当前是无锁状态,通过cas更新state状态的值为1,并设置当前线程持有锁,并返回true,表示获取锁成功
      • 如果当前线程属于重入,则增加重入次数,返回true
      • 如果上诉情况都不满足,则获取锁失败,返回false

    ReentrantLock的lock方法调用流程图:

    1.ReentrantLock的lock加锁流程图.png
    • 解析
      • 在ReentrantLock执行lock()的过程中,大部分同步机制的核心逻辑都已经在AQS中实现,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫做模板模式
        • 这一设计模式在Android的Activity生命周期也有使用,因为Activity的生命周期的执行流程都已经在framework中定义好了,子类Activity只要在相应的onCreate,onPause等生命周期方法中提供相应的实现即可。
      • JUC包中的其他组件如CountDownLatch,Semaphor等都是通过一个内部类Sync来继承AQS,然后在内部中通过操作Sync来实现同步。
        • 这种做法的好处是将线程控制的逻辑控制在Sync内部,而对外面向用户提供的接口是自定义锁,这中聚合关系能够很好的解耦两者所关注的逻辑。

    2.AQS核心功能原理分析

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
        private transient volatile AbstractQueuedSynchronizer.Node head;
        private transient volatile AbstractQueuedSynchronizer.Node tail;
        
        private volatile int state;
        
        static final class Node {
            ...
        }
    }
    
    • AQS类中有几个关键的属性,Node和state
    2.1.state 锁状态
    • state表示当前锁状态。
      • 当state=0时表示无锁状态;
      • 当state>0时,表示已经有线程获得了锁,也就是state=1
      • 如果同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5.
        • 而在释放锁的时候,同样需要释放5次直到state=0,其他线程才有资格获得锁。

    state还有一个功能是实现锁的独占模式或者共享模式

    • 独占模式:只有一个线程能够持有同步锁
      • 比如在独占模式下,我们可以把state的初始值设置为0,当某个线程申请锁对象时,需要判断state的值是不是0,如果不是0的话意味着其他线程已经持有该锁,则本线程需要阻塞等待。
    • 共享模式:可以有多个线程持有同步锁
      • 在共享模式下,比如某项操作我们允许10个线程同时进行,超过这个数量的线程就需要阻塞等待。
      • 那么只需要在线程申请对象时判断state的值是否小于10,如果小于10,就将state加1后继续同步语句的执行。
      • 如果等于10,说明已经有10个线程在同时执行该操作,本线程需要阻塞等待
    2.2.Node双端队列节点

    Node时一个先进先出的双端队列,并且是等待队列,当多线程争用资源被阻塞时会进入此队列。这个队列时AQS实现多线程同步的核心。

    AQS中有两个Node指针,分别指向队列的head和tail,分别对应头节点和尾节点。

    • Node类结构如下:
        static final class Node {
            // 该等待同步的节点处于共享模式
            static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
            // 该等待同步的节点处于独占模式
            static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
            
            // Node中的线程状态,和AQS中的state不同,值范围有1,0,-1,-2,-3五个之
            static final int CANCELLED = 1;
            static final int SIGNAL = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            // 前驱节点
            volatile AbstractQueuedSynchronizer.Node prev;
            // 后继节点
            volatile AbstractQueuedSynchronizer.Node next;
            // 等待锁的线程
            volatile Thread thread;
            // 用于标记当前当前节点是否是共享模式,在Node构造函数中赋值
            AbstractQueuedSynchronizer.Node nextWaiter;
    
            final boolean isShared() {
                return this.nextWaiter == SHARED;
            }
    
            final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
                AbstractQueuedSynchronizer.Node var1 = this.prev;
                if (var1 == null) {
                    throw new NullPointerException();
                } else {
                    return var1;
                }
            }
    
            Node() {
            }
    
            Node(Thread var1, AbstractQueuedSynchronizer.Node var2) {
                this.nextWaiter = var2;
                this.thread = var1;
            }
    
            Node(Thread var1, int var2) {
                this.waitStatus = var2;
                this.thread = var1;
            }
        }
    

    3.当前线程获取锁失败后的流程分析

    锁的意义就是使竞争到锁对象的线程执行同步代码,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock时如何实现让线程等待并唤醒的呢?

    • 在ReentrantLock.lock()方法中,会调用Sync的acquire()方法,接着调用AQS的tryAcquire 获取锁.
      • tryAcquire方法需要子类(Sync)复写并实现,如果返回true说明获取锁成功,继续执行同步代码块,
      • 如果tryAcquire方法返回false,也就是当前锁对象被其他线程所持有,会接着调用AQS的addWaiter和acquireQueued方法
    3.1.AQS的addWaiter方法

    当前线程获取锁失败后,会被添加到一个等待队列的末端,源码如下:

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
        
        private transient volatile AbstractQueuedSynchronizer.Node head;
        private transient volatile AbstractQueuedSynchronizer.Node tail;
        private volatile int state;
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        
        //将线程以Node的方式添加到队列中,添加在tail节点后面
        private AbstractQueuedSynchronizer.Node addWaiter(AQS.Node var1) {
            // 把当前线程封装到一个新的Node对象中
            AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), var1);
            AbstractQueuedSynchronizer.Node pred = this.tail;
            // 将node插入队列
            if (pred != null) {
                node.prev = pred;   //前继节点为tail
                if (this.compareAndSetTail(pred, node)) {   
                    //cas替换当前尾部,成功则返回true并设置tail的next后继节点为新节点
                    pred.next = node;
                    return node;
                }
            }
            // 插入队列失败,进入enq自旋重试入队
            this.enq(node);
            return node;
        }
    
        //插入新节点到队列中,如果队列未初始化则先进行初始化,然后再插入
        private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
            while(true) {
                AbstractQueuedSynchronizer.Node t = this.tail;
                if (t == null) {
                    // 如果队列从未被初始化,则初始化一个头节点head,并赋值给tail
                    if (this.compareAndSetHead(new AbstractQueuedSynchronizer.Node())) {
                        this.tail = this.head;
                    }
                } else {
                    // 将新节点node插入到tail节点的后面
                    node.prev = t;
                    if (this.compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    }
    
    • 有两种i情况会导致插入队列失败:
      • tail为空:说明队列从未被初始化,因此需要调用enq方法再队列中插入一个空的Node
      • compareAndSetTail失败:说明插入过程中有线程修改了此队列,因此需要调用enq将当前node重新插入到队列末端。
    • 经过addWaiter方法之后,此时线程以Node方法被加入到队列的末端,但是线程还没有被执行阻塞操作,真正的阻塞操作在方法acquireQueued中判断执行
    3.2.AQS的acquireQueued 方法
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
        
        private transient volatile AbstractQueuedSynchronizer.Node head;
        private transient volatile AbstractQueuedSynchronizer.Node tail;
        private volatile int state;
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        
        final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                while(true) {
                    // 获取新节点的前继节点
                    AbstractQueuedSynchronizer.Node preNode = node.predecessor();
                    
                    // 检测当前节点的前驱节点是否是头节点head,这是获取锁的资格
                    // 如果是头节点的话,则调用tryAcquire尝试获取锁
                    // 获取锁成功,则将头节点设置为当前节点
                    if (preNode == this.head && this.tryAcquire(arg)) {
                        this.setHead(node);
                        preNode.next = null;
                        failed = false;
                        boolean result = interrupted;
                        return result;
                    }
    
                    // 尝试获取锁失败,则根据前驱节点判断是否要阻塞
                    // 如果阻塞过程中被中断,则设置interrupted标志位为true
                    // shouldParkAfterFailedAcquire 方法在前驱节点状态不为SIGNAL的情况下都会循环重试获取锁
                    if (shouldParkAfterFailedAcquire(preNode, node) && this.parkAndCheckInterrupt()) {
                        interrupted = true;
                    }
                }
            } finally {
                if (failed) {
                    this.cancelAcquire(node);
                }
            }
        }
    }
    
    • 解析
      • 在acquireQueued方法中并不会立即挂起该节点中的线程,因此在插入节点的过程中,之前持有锁的线程可能已经执行完毕并释放锁,所以这里使用自旋再次去尝试获取锁。
      • 如果自旋操作还是没有获取到锁,那么就将该线程挂起(阻塞)
    // 根据前驱节点中的waitStatus 来判断是否需要阻塞当前线程
    private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
        // 获取前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点的状态是SIGNAL(-1),则返回true,表示需要将当前线程挂起
        if (ws == -1) {
            return true;
        } else {
            if (ws > 0) {
                // 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点
                // 当前线程之后会再次回到循环并尝试获取锁
                do {
                    node.prev = pred = pred.prev;
                } while(pred.waitStatus > 0);
                pred.next = var1;
            } else {
                // 等待状态为0 或者 PROPAGATE(-3),设置前驱节点的等待状态为SIGNAL,
                // 并且之后会回到循环再次重试获取锁
                compareAndSetWaitStatus(pred, ws, -1);
            }
            return false;
        }
    }
    
    • 解析:
      • 在shouldParkAfterFailedAcquire 方法中判断当前线程是否应该被挂起,主要是通过前驱节点的waitStatus(等待状态)的值,Node中的等待状态一共有5种取值,代表的意义如下:
    waitStatu值 描述
    CANCELLED(1) 当前线程因为超时或者中断被取消。这时一个终结态,也就是状态到此为止
    SIGNAL(-1) 当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程,这个状态一般都是后继线程来设置前驱节点的
    CONDITION(-2) 当前线程在condition队列种
    PROPAGATE(-3) 用于将唤醒后的线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点称为头节点之前,是不会跃迁为此状态的
    0 表示无锁状态
    • shouldParkAfterFailedAcquire 方法中,根据waitStatus 不同的值进行不同的操作,主要有以下几种情况:
      • 如果waitStatus等于SIGNAL,返回true,将当前线程挂起,等待后续唤醒操作即可
      • 如果waitStatus大于0也就是CANCEL状态,会将此前驱节点从队列中删除,并在循环中逐步寻找下一个不是cancel状态的节点作为当前节点的前驱节点
      • 如果waitStatus既不是SIGNAL也不是CANCEL,则将当前节点的前驱节点状态设置为SIGNAL,这样做的好处是下一次执行shouldParkAfterFailedAcquire 时可以直接返回true,挂起线程。
    3.3.线程挂起操作
    • 在acquireQueued方法中,如果shouldParkAfterFailedAcquire 方法返回true表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt 方法执行真正的阻塞线程代码
    
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
        
    public class LockSupport {
        public static void park(Object var0) {
            Thread var1 = Thread.currentThread();
            setBlocker(var1, var0);
            UNSAFE.park(false, 0L);
            setBlocker(var1, (Object)null);
        }
    }
    
    • 解析:
      • 线程挂起方法比较简单,知识调用了LockSupport中的park方法,在park方法中调用Unsafe的native方法将线程挂起,该方法会执行操作系统中的线程挂起流程
    获取锁流程总结:
    1. AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取锁
    2. 如果获取锁失败,通过addWaiter方法将线程构造成Node节点插入到同步队列队尾
    3. 在acquireQueued方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport中的native方法来实现线程阻塞。

    4.释放锁操作

    • 释放锁调用了ReentrantLock的unlock方法,该方法需要将上面获取锁失败的线程进行唤醒,并重新执行。具体AQS是何时尝试唤醒等待队列中被阻塞的线程呢?
    public class ReentrantLock implements Lock, Serializable {
        public void unlock() {
            this.sync.release(1);
        }
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
    
            protected final boolean tryRelease(int arg) {
                int newState = this.getState() - arg;
                // 判断当前线程是否是持有锁的线程,不是则直接抛出异常
                if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
                    throw new IllegalMonitorStateException();
                } else {
                    boolean isRelease = false;
                    // 锁状态为0,表示当前尝试释放锁成功,接着执行真正的锁释放操作流程
                    if (newState == 0) {
                        isRelease = true;
                        this.setExclusiveOwnerThread((Thread)null);
                    }
                    this.setState(newState);
                    return isRelease;
                }
            }
        }
    }
    
    public abstract class AbstractQueuedSynchronizer{ 
        public final boolean release(int arg) {
            if (this.tryRelease(arg)) {
                AbstractQueuedSynchronizer.Node h = this.head;
                if (h != null && h.waitStatus != 0) {
                    this.unparkSuccessor(h);
                }
                return true;
            } else {
                return false;
            }
        }
    }
    
    • 解析:
      • 锁释放操作会调用AQS的release方法,首先会调用子类AQS的实现方法tryRelease尝试释放锁
      • 如果返回成功则最终会调用AQS的unparkSuccessor方法来实现释放锁的操作。
    public abstract class AbstractQueuedSynchronizer {
        
        private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
            // 获取node的等待状态,其实node为头节点
            int ws = node.waitStatus;
            if (ws < 0) {   // 1.将头节点的等待状态设置为0(无锁状态)
                compareAndSetWaitStatus(node, ws, 0);
            }
    
            // 2.获取头节点的下一个节点
            AbstractQueuedSynchronizer.Node nextNode = node.next;
            if (nextNode == null || nextNode.waitStatus > 0) {
                nextNode = null;
                // 如果下一个节点为null,或者下个节点的状态为cancel(1),则找到队列中不是cancel的节点
                // for循环从队尾开始遍历寻找,找到第一个waitStatus《=0的节点,并进行唤醒
                for(Node t = this.tail; t != null && t != node; t = t.prev) {
                    if (t.waitStatus <= 0) {
                        nextNode = t;
                    }
                }
            }
    
            // 3.如果下一个节点不为空,而且等待状态《=0,则调用LockSupport的unpark方法唤醒线程
            if (nextNode != null) {
                LockSupport.unpark(nextNode.thread);
            }
        }
    }
    
    • 解析:
      • 首先获取当前节点(实际上传入的是head节点)的状态,如果head节点的下一个节点是null,或者下一个节点的状态为cancel,则从等待队列的尾部开始遍历,直到寻找到第一个节点的waitStatus小于0的节点
      • 如果最终遍历到的节点不为null,则调用LockSupport的unpark方法,调用底层方法唤醒线程。

    相关文章

      网友评论

        本文标题:并发编程(六)ReentrantlLock实现原理

        本文链接:https://www.haomeiwen.com/subject/yktyaktx.html