美文网首页javacode
ReentrantLock 可重入锁 源代码解析

ReentrantLock 可重入锁 源代码解析

作者: 九点半的马拉 | 来源:发表于2019-06-24 21:14 被阅读0次

    ReentrantLock类

    • 可重入性
      即当该子程序正在运行时,可以再次进入并执行它。如果进入同一个线程,该线程的锁的计数器就是增加1,只有等到锁的计数器降为0时才会被释放。
    • 独占锁
      java在并发模式下提供两种加锁模型,共享锁和独占锁。在独占模式下,只有一个线程可以拥有该锁,其他线程只有在该线程释放这个锁后才能申请拥有该锁。而共享锁是允许多个锁同时拥有该锁,并发访问共享资源,ReentrantLock是独占锁
    • 悲观锁
      悲观锁总是假设是最坏的情况,每次去拿数据的时候都会认为会被修改,所以每次在拿数据的时候都会上锁,别人想拿这个锁的时候就会发生阻塞现象。乐观锁总是假设最好的情况,每次去拿数据的时候都会认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法去实现。ReentrantLock是悲观锁。独占锁是一种悲观锁,它避免了读、读冲突,共享锁是一种乐观锁。
    • 公平锁与非公平锁
      是指线程在请求获取锁的过程中,是否允许插队。在公平锁中,线程会按照它们发出的请求来获得锁,而非公平锁则允许在线程发出请求后立即尝试获取锁,尝试失败才进行排队等待。ReentrantLock默认采用非公平模式

    锁的类Node

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
                                        #因超时或中断,该线程被取消
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            #该线程的后继线程已阻塞,当该线程release或cancel要重启这个后继线程
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
                #表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
                            #传播共享锁
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
            private transient volatile Node head;
            private transient volatile Node tail;
            private volatile int state;
    
    

    在上面代码中,有一个参数state,初始化为0,表示未锁定状态。A线程Lock时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。再A线程释放该锁之前,A线程可以重复获取该锁,并执行state+1,即为可重入。但要注意的是,获取多少次就要释放多少次,这样才能保证state是能回到零态的。


    同步队列.png

    公平锁FairSync类

    FairSync主要有两个方法 lock()和tryAcquire(),
    lock方法中会调用AbstractQueuedSynchronizer类中的acquire()方法。
    AbstractQueuedSynchronizer是一个抽象类,tryAcquire()的具体方法过程在FairSync类和NonfairSync类中实现。
    与下文的非公平锁相比较,tryAcquire()多了一个判断条件hasQueuedPredecessors()
    如果hasQueuedPredecessors返回true,表示有其他线程先于当前线程等待获取锁,此时为了实现公平,保证等待时间最长的线程先获取到锁,不能执行CAS

     public final void acquire(int arg) {
            //尝试获取锁
            if (!tryAcquire(arg) &&
                //获取不到,则放到等待队列中去,返回是否中断
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                //返回中断,调用该方法
                selfInterrupt();
        }
    
    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();
                //获取同步状态
                int c = getState();
                //没有线程获取锁
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //判断当前线程是否是获取锁的线程
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    上面用到了compareAndSetState(0, acquires)方法,是一种CAS(compare and swap),即比较和替换
    CAS(compare and Swap)

    <font color = "red" >该部分内容引用了简书大神占小狼的内容https://www.jianshu.com/p/fb6e91b013cc </font>
    它有三个参数,一个为当前内存值V,旧的预期值A和即将更新的值B,当且仅当旧的预期值A和内存值V相等时,才能将内存值修改为B并返回true,其他情况不执行任何操作并返回false.
    Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。

    protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this,
             #变量stateOffset表示该变量值在内存中的偏移地址,以为Unsafe就是根据内存偏移地址获取数据的
             stateOffset, expect, update);
      }
    
    
    • 使用场景
      使用于简单的对象的操作,比如布尔值,整型值
      适合冲突较少的情况,比如太多线程在同时自旋,那么长时间自旋会导致CPU开销很大
    • CAS的ABA问题
      ABA问题就是如下所示:
    1. 线程1获取该位置的数值为A,之后执行CAS操作,期望值为A,只有当前的值为A才能被修改
    2. 线程2将数据修改为B
    3. 线程3将数据改为A
    4. 线程1检测当前的值为A,与预期值相同,则执行操作。
      在上述并发条件下,线程1最后执行了修改操作,但此时的A与初始的A不一样了,中间经历了其他修改,CAS就是防止在执行操作时预防有别的线程在操作。
    • CAS的ABA问题的解决办法
      新增了AtomicStampedReference类来解决该问题,为值增加了修改版本号,每次修改时,都会修改其版本号。首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
    public class AtomicStampedReference<V> {
        private static class Pair<T> {
            //对象引用
            final T reference;
            //标志版本
            final int stamp;
            private Pair(T reference, int stamp) {
                this.reference = reference;
                this.stamp = stamp;
            }
            static <T> Pair<T> of(T reference, int stamp) {
                return new Pair<T>(reference, stamp);
            }
        }
        # expectedReference:更新之前的期望值
        # newReference 要更新的新值
        # expectedStamp 期待更新的标志版本 newStamp: 将要更新的标志版本   
        public boolean compareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
            Pair<V> current = pair;
            return
                expectedReference == current.reference &&
                expectedStamp == current.stamp &&
                ((newReference == current.reference &&
                  newStamp == current.stamp) ||
                 casPair(current, Pair.of(newReference, newStamp)));
        }
    

    非公平锁NonfairSync类

    非安全锁在上锁时会先通过compareAndSetState(0, 1) 判断如果没有线程拥有该锁时就直接添加到该线程中
    如果有线程拥有该锁的话,就调用acquire()方法,同上面的acquire()方法一致

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
       }
    
    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    同步队列添加元素

    同步队列添加元素.png

    先获取同步队列的尾结点,然后将插入的结点的前驱结点连接到原尾结点,之后通过CAS方式设置同步结点的尾结点的地址为插入的结点位置。当尾结点为空或者设置tail指向新插入的结点失败时执行enq方法。

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
     private Node enq(final Node node) {
           //自旋操作
            for (;;) {
                Node t = tail;
                //如果尾结点为空
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    //通过CAS添加到尾部
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    

    锁的释放

    首先调用unlock方法,然后调用AbstractQueuedSynchronizer类中的release()方法
    调用tryRelese()方法释放当前线程并唤醒启动等待队列里的头结点

    public void unlock() {
            sync.release(1);
        }
    
     public final boolean release(int arg) {
            #尝试释放当前线程
            if (tryRelease(arg)) {
                #启动等待线程队列里的头结点,h为当前执行的线程,在unparkSuccessor方法中会唤醒当前线程的后继结点
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    在tryRelease()方法中只有当线程的state为0时才会释放该线程,否则说明是重入锁,需要多次释放将state改为0才能该线程释放掉

     protected final boolean tryRelease(int releases) {
                                   #线程计数器减1
                int c = getState() - releases;
                               #只有锁的拥有者是当前线程时才能继续执行,否则会抛出异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    
    /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                #唤醒该进程
                LockSupport.unpark(s.thread);
        }
    

    才疏学浅,文章如果有不足请多多指教
    该文章也发布在了我的个人博客 https://spurstong.github.io

    相关文章

      网友评论

        本文标题:ReentrantLock 可重入锁 源代码解析

        本文链接:https://www.haomeiwen.com/subject/ikeaqctx.html