美文网首页
ReentrantLock可重入锁之非公平锁实现原理

ReentrantLock可重入锁之非公平锁实现原理

作者: Gorden_Tam | 来源:发表于2018-10-16 17:00 被阅读0次

    一.ReentrantLock概述

    ReentrantLock是基于AQS(AbstractQueuedSynchronizer)实现的,AQS是并发包的基础,CountDownLatch,FutureTask,Semaphore等都是基于AQS实现的。
    

    二.AQS简介

    AQS是基于FIFO队列(CLH队列)实现的,因此存在一个个节点,节点里面主要有:
    
            //表示node处于共享模式
            static final Node SHARED = new Node();
            //表示node处于独占模式
            static final Node EXCLUSIVE = null;
    
           //因为超时或者中断,node被设置为取消状态,被取消的node不能去竞争锁,只能保持取消状态不变,处于这种状态的node会被提出队列,被gc回收。
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            //表示这个node在条件队列中
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    三.ReentrantLock的实现

    ReentrantLock中有一个抽象嵌套类Sync,两个嵌套类NonfairSync和FairSync继承Sync实现lock()和tryAcquire()方法分别对应公平sync和非公平Sync。ReentrantLock中主要的嵌套类和API如下图。


    image.png

    ReentrantLock默认为非公平锁,可以通过在构造函数中传入true来定义一个公平锁实例:

        public ReentrantLock() {
            sync = new NonfairSync();
        }
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    下面来看一下非公平锁的加锁过程,假如实例一个非公平锁,线程一调用lock()方法

    public void lock() {
            sync.lock();
        }
    

    lock()方法在公平Sync和非公平Sync中实现不同,下面是非公平的Sync中lock的实现:

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    

    compareAndSetState定义在AQS类中,使用cas操作更新state的值为1.

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    //stateOffset是state的地址偏移量,会在类加载时候赋值。
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    

    由此可知线程一获取锁的时候做两件事:
    1.使用cas操作将AbstractQueuedSynchronized的state更新为1。
    2.将AOS的thread设置为当前线程。

    此时线程2执行调用lock方法时cas操作将会失败,因为期望值已经被线程一更新为1。则线程二会执行acquire方法。

    //acquire方法定义在AQS类中,tryAcquire为抽象方法,需要实现AQS的类重写该方法。
        //以独占的方式尝试获取锁,忽略中断,tryAcquire方法需子类自己实现,如果尝试获取锁失败,则将节点入队
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    //NonfairSync中
    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    

    先来看看Sync中尝试获取锁的函数:

    //最终会调用Sync中的nonfairTryAcquire方法
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //获得state
                int c = getState();
                //再次cas,如果没有线程占用锁,则线程二可以独占锁
                //这地方有第一个疑问,有没有可能线程一占用锁,此时线程二执行到这段代码,获得c的值为1,这个时候线程一释放了锁,state更新为0,但线程二无法进入第一个if,第二个else if也无法进入,只能返回失败。
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //否则判段持有锁的是否为当前线程,如果当前线程正在占有锁,则将state值+1.
                //这地方有个第二个疑问,有没有可能当前线程在获得state的值并赋值给c后,其他线程更新了这个state的值,这样nextc得到的是不准确的值。后来想了想不可能,因为若是其他线程持有锁,则不会进入else if这个判段,若是当前线程持有锁,则调用lock()方法后正在执行这段代码的只有本线程,不可能有别处修改state的值。所以使用getState()获得的state值是准确的。
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    nextc小于0时抛出一个error,说明线程最大可重入次数为Integer.MAX_VALUE
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                
                return false;
            }
    

    再来看看addWaiter方法:

    //以当前线程和给定的模式新建并入队节点,模式分为独占模式和共享模式。
        private Node addWaiter(Node mode) {
            //因为是独占模式,所以传入的mode为null
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //这句话的意思为先执行一次cas操作尝试以快速的方式将节点入队,如果失败了的话,则执行enq入队方法,循环进行cas入队操作。
            //获取队列的尾节点
            Node pred = tail;
            //如果尾节点为非空,说明队列中已经有节点,则将新入队结点前一个节点设置为队列中的尾节点,cas更新AQS的尾节点tail为node。
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果队列为空或者cas失败,则执行enq。
            enq(node);
            return node;
        }
    
    //使用cas将tail指向新入队的节点。
        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
    

    如下为enq方法:

        private Node enq(final Node node) {
            for (;;) {
                //如果队列为空,则初始化头结点尾节点,将头节点设置为一个傀儡node
                //这里头尾节点都是延迟初始化,在出现竞争有节点入队时才初始化。
                Node t = tail;
                //如果队列为空,则先将头尾节点指向一个没有关联线程的傀儡节点
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    如上这段代码揭示了CLH队列究竟是什么样子,如下图。


    未命名文件7.png

    由此可见CLH队列为一个双向链表,头结点为一个傀儡节点,尾节点的引用指向队列中最后一个node。
    这里还有一个疑问,就是为什么要使用cas将tail指向新入队节点呢?待明确

    再来看看acquireQueued这个函数

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                     //如果队列的前驱结点为头结点,说明当前节点在队列头,则再次执行tryAcquire方法尝试获取锁。
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //更新node的waitStastus
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        //将当前线程的状态变更为等待状态。
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    再来看看shouldParkAfterFailedAcquire这个方法,这个方法的主要目的用来更新节点的waitstatus。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //如果这个节点的waitStatus已经
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            //如果waitStatus为CANCEL,说明其前驱节点对应的线程被中断,跳过这个节点
            if (ws > 0) {
                 //循环,只要前驱结点的waitStatus为CANCEL,则继续向前找,将这些CANCEL的节点从链表上断开,后续会被gc回收。
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    这段代码的主要目的是将入队了的节点的前驱的waitStatus,如果其前驱节点的waitStatus为0,则cas更新为SIGNAL,如果为CANCAL,则将其从链表上断开。可以结合下面情形来理解:
    现在假设线程一、线程二和线程三竞争锁,线程一先竞争锁,线程二的node先入队,进入acquireQueued这个方法后,先tryAquire()尝试能否获得锁,不能则进入shouldParkAfterFailedAcquire(),线程二的前驱节点为head,其waitStatus=0,cas更新其为SIGNAL-1。然后线程三入队,将线程二对应的node的waitStatus更新为-1,并调用lockSupport.park方法阻塞当前线程。最终结果如下图
    
    未命名文件7.png

    综上所述,可以知道整个加锁过程中发生了什么操作:当有竞争发生时,整个操作如下:首先cas更新state值为1,并将占用锁的线程设置为当前线程。如果cas失败,则再次获取state的值,如果state的值为0则说明竞争已解除再如上进行一次cas操作更新state为1。如果不为state不为0,则判段占用锁的是否为当前锁,如果是,则state的值+1。如果不是,则将当前线程入队,入队时每次节点插入到队列的末尾,然后使用cas更新尾节点指向新入队的节点。然后修改节点前驱节点的waitStatus为SIGNAL,并将当前结点阻塞。从而完成整个加锁过程

    那么解锁过程又发生了什么呢?
    
        public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                如果队列中有node的话,那么head的waitStatus为-1.
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    其中release()方法定义在AQS类中,tryRelease方法需要继承AQS的类自己实现。

            protected final boolean tryRelease(int releases) {
                //这里和state的累加
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    因为调用一次lock()就要相对应的调用一次unlock(),所以没调用一次unlock(),state值-1,当state为0时,调用setExclusiveOwnerThread将占有锁的线程置为空。表明已释放占有的锁。然后接着判段队列的head是否为null,waitStatus是否<0,如果head不为null并且waitStatus<0时,说明队列中存在node,则需要调用unparkSuccessor方法唤醒队列第一个node对应的线程。FIFO就体现在这里。

     private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            //这里的node为头结点,将头结点的waitStatus设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            //如果队列的头节点的线程已经中断了,那么需要从队列尾向前寻找
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    被唤醒的线程将继续执行,该被阻塞线程是调用parkAndCheckInterrupt()阻塞的,代码仍在for循环中,将继续执行。将进入第一个if判段,此时tryAcquire可以成功更新state的值,将会返回true,第一个if成立,将头节点指向下一个队列的下一个节点。被唤醒的线程对应的node将会从双向链表中断开并最终被gc回收。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        //此时被唤醒的线程对应的节点已经出队,将被gc回收
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    看到这里不得不赞叹Doug Lea的鬼斧神工。

    故回顾整个解锁过程,首先会cas更新state,没调用一次unlock就会将state的值-1。当state为0时,表示锁已释放,此时将队列中的第一个节点对应的线程唤醒,被唤醒的线程可以成功占有锁,并将该线程从出队,head指向队列中新的第一个节点。

    如上为整个ReentrantLock非公平所的加锁解锁过程,公平锁的加锁过程会在之后更新。

    相关文章

      网友评论

          本文标题:ReentrantLock可重入锁之非公平锁实现原理

          本文链接:https://www.haomeiwen.com/subject/oeupaftx.html