美文网首页Java
ReentrantLock实现机制(CLH队列锁)

ReentrantLock实现机制(CLH队列锁)

作者: 王白告龙 | 来源:发表于2018-02-08 14:31 被阅读713次

    如何实现一个锁

    实现一个锁,主要需要考虑2个问题

    1. 如何线程安全的修改锁状态位?
    2. 得不到锁的线程,如何排队?

    带着这2个问题,我们看一下JUC中的ReentrantLock是如何做的?

    ReentrantLock锁实现

    ReentrantLock类的大部分逻辑,都是其均继承自AQS的内部类Sync实现的

    如何线程安全的修改锁状态位?

    锁状态位的修改主要通过,内部类Sync实现的

        public abstract class AbstractQueuedSynchronizer{
             //锁状态标志位:volatile变量(多线程间通过此变量判断锁的状态)
              private volatile int state;
             
              protected final int getState() {
                     return state;
              }
    
     
             protected final void setState(int newState) {
                  state = newState;
             }
    
        }
      
        abstract static  Sync extends AbstractQueuedSynchronizer {
        
            
             final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //volatile读,确保了锁状态位的内存可见性
                int c = getState();
                //锁还没有被其他线程占用
                if (c == 0) {
                    //此时,如果多个线程同时进入,CAS操作会确保,只有一个线程修改成功
                    if (compareAndSetState(0, acquires)) {
                        //设置当前线程拥有独占访问权
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //当前线程就是拥有独占访问权的线程,即锁重入
                else if (current == getExclusiveOwnerThread()) {
                    //重入锁计数+1
                    int nextc = c + acquires;
                    if (nextc < 0) //溢出
                        throw new Error("Maximum lock count exceeded");
                    //只有获取锁的线程,才能进入此段代码,因此只需要一个volatile写操作,确保其内存可见性即可
                    setState(nextc);
                    return true;
                }
                return false;
            }
            
            //只有获取锁的线程才会执行此方法,因此只需要volatile读写确保内存可见性即可
            protected final boolean tryRelease(int releases) {
                //锁计数器-1
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //锁计数器为0,说明锁被释放
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
        }
       
    

    从代码中,我们可以发现线程安全的关键在于:<font color="#dd0000">volatile变量</font>和<font color="#dd0000">CAS原语</font>的配合使用

    得不到锁的线程,如何排队?

    JUC中锁的排队策略,是基于CLH队列的变种实现的。因此,我们先看看啥是CLH队列

    CLH队列

    image.png

    如上图所示,获取不到锁的线程,会进入队尾,然后自旋,直到其前驱线程释放锁。

    这样做的好处:假设有1000个线程等待获取锁,锁释放后,只会通知队列中的第一个线程去竞争锁,减少了并发冲突。(ZK的分布式锁,为了避免惊群效应,也使用了类似的方式:获取不到锁的线程只监听前一个节点)

    为什么说JUC中的实现是基于CLH的“变种”,因为原始CLH队列,一般用于实现自旋锁。而JUC中的实现,获取不到锁的线程,一般会时而阻塞,时而唤醒。

    JUC中的CLH队列实现

    我们来看看AbstractQueuedSynchronizer类中的acquire方法实现

        
        public final void acquire(int arg) {
            //尝试获取锁
            if (!tryAcquire(arg) &&
                //获取不到,则进入等待队列,返回是否中断
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                //如果返回中断,则调用当前线程的interrupt()方法
                selfInterrupt();
        }
    

    入队

    如果线程调用tryAcquire(其最终实现是调用上面分析过的nonfairTryAcquire方法)获取锁失败。则首先调用addWaiter(Node.EXCLUSIVE)方法,将自己加入CLH队列的尾部。

        private Node addWaiter(Node mode) {
            //线程对应的Node
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            //尾节点不为空
            if (pred != null) {
                //当前node的前驱指向尾节点
                node.prev = pred;
                //将当前node设置为新的尾节点
                //如果cas操作失败,说明线程竞争
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //lockfree的方式插入队尾
            enq(node);
            return node;
        }
        private Node enq(final Node node) {
            //经典的lockfree算法:循环+CAS
            for (;;) {
                Node t = tail;
                //尾节点为空
                if (t == null) { // Must initialize
                    //初始化头节点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    入队过程,入下图所示

    1 T0持有锁时,其CLH队列的头尾指针为NULL

    image.png

    2 线程T1,此时请求锁,由于锁被T0占有。因此加入队列尾部。具体过程如下所示:

    (1) 初始化头节点

    (2) 初始化T1节点,入队,尾指针指向T1

    image.png

    3 此时如果有一个T10线程先于T1入队,则T1执行compareAndSetTail(t, node)会失败,然后回到for循环开始处,重新入队。


    image.png

    由自旋到阻塞

    入队后,调用acquireQueued方法,时而自旋,时而阻塞,直到获取锁(或被取消)。

      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //其前驱是头结点,并且再次调用tryAcquire成功获取锁
                    if (p == head && tryAcquire(arg)) {
                        //将自己设为头结点
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        //成功获取锁,返回
                        return interrupted;
                    }
                    //没有得到锁时:
                    //shouldParkAfterFailedAcquire方法:返回是否需要阻塞当前线程
                    //parkAndCheckInterrupt方法:阻塞当前线程,当线程再次唤醒时,返回是否被中断
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //修改中断标志位
                        interrupted = true;
                }
            } finally {
                if (failed)
                    //获取锁失败,则将此线程对应的node的waitStatus改为CANCEL
                    cancelAcquire(node);
            }
        }
        
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
        
          /**
         * 获取锁失败时,检查并更新node的waitStatus。
         * 如果线程需要阻塞,返回true。
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            
            //前驱节点的waitStatus是SIGNAL。
            if (ws == Node.SIGNAL)
                /* 
                 * SIGNAL状态的节点,释放锁后,会唤醒其后继节点。
                 * 因此,此线程可以安全的阻塞(前驱节点释放锁时,会唤醒此线程)。
                 */
                return true;
                
            //前驱节点对应的线程被取消
            if (ws > 0) {
                do {
                    //跳过此前驱节点
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                   此时,需要将前驱节点的状态设置为SIGNAL。
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
         //当shouldParkAfterFailedAcquire方法返回true,则调用parkAndCheckInterrupt方法阻塞当前线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    自旋过程,入下图所示

    image.png image.png image.png image.png image.png



    然后线程T2,加入了请求锁的队列,尾指针后移。

    image.png image.png image.png

    终上所述,每个得不到锁的线程,都会讲自己封装成Node,加入队尾,或自旋或阻塞,直到获取锁(为简化问题,不考虑取消的情况)

    锁的释放

    前文提到,T1,T2在阻塞之前,都回去修改其前驱节点的waitStatus=-1。这是为什么?
    我们看下锁释放的代码,便一目了然

        public final boolean release(int arg) {
            //修改锁计数器,如果计数器为0,说明锁被释放
            if (tryRelease(arg)) {
                Node h = head;
                //head节点的waitStatus不等于0,说明head节点的后继节点对应的线程,正在阻塞,等待被唤醒
                if (h != null && h.waitStatus != 0)
                    //唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        
       
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
          
            //后继节点
            Node s = node.next;
            //如果s被取消,跳过被取消节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                //唤醒后继节点
                LockSupport.unpark(s.thread);
        }
    
    

    如代码所示,waitStatus=-1的作用,主要是告诉释放锁的线程:后面还有排队等待获取锁的线程,请唤醒他!

    释放锁的过程,如图所示:

    image.png image.png

    总结

    实现锁的关键在于:

    1. 通过CAS操作与volatile变量互相配合,线程安全的修改锁标志位
    2. 基于CLH队列,实现锁的排队策略

    问题讨论

    1 unparkSuccessor时为什么会出现s==null || s.waitStatus>0的情况
    2 这种情况下,为什么要通过prev指针反向查找Successor节点

    //unparkSuccessor方法的相关代码
    //释放锁时,s就是head.next
     if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
    

    原因如下:
    当释放锁的线程在执行unparkSuccessor(head)时,分情况进行分析:
    (1)s==0的情况:
    触发条件:head的successor节点获取锁成功后,执行了head.next=null的操作后,解锁线程读取了head.next,因此s==null
    反向遍历的原因:next指针在head.next处断开了,只能通过prev指针遍历
    关键代码,如图所示:


    image.png

    (2)s.waitStatus > 0的情况
    触发条件:head的successor节点被取消(cancelAcquire)时,执行了如下操作:
    successor.waitStatus=1 ;
    successor.next = successor;
    反向遍历的原因:next指针在head节点的后继节点处断开(head.next.next),因此只能通过prev指针遍历
    关键代码,如图所示:


    image.png

    相关文章

      网友评论

        本文标题:ReentrantLock实现机制(CLH队列锁)

        本文链接:https://www.haomeiwen.com/subject/zdgeoxtx.html