美文网首页一起读源码
Java中的“锁”事之ReentrantLock

Java中的“锁”事之ReentrantLock

作者: MR丿VINCENT | 来源:发表于2020-08-18 09:29 被阅读0次
    keyboard

    谈谈“锁”

    说起Java的锁,脑袋里第一反应就是关键字synchronized.这是Java提供的基于语言级别的锁,底层是通过cup指令来实现的。对于使用者来说非常简单,容易上手。然而也有一些小缺陷。在早期的jvm中synchronized性能不是太好,而且加锁和释放锁不是很灵活,比如只能在程序正常执行完成和抛出异常时释放锁,对锁的持有很“执着”,获取锁的时候没法设置超时时间等。

    除了jvm层面实现的锁之外,JDK中也提供了另外的锁实现。下面从一个例子说起。

    ReentrantLock

    public void test0() throws InterruptedException {
            ReentrantLock lock = new ReentrantLock();
    
            Thread t1 = new Thread(() -> {
                boolean b = lock.hasQueuedThreads();
                System.out.println("t1"+ b);
                lock.lock();
                System.out.println("t1 start working...");
                try {
                    for (int i = 0; i < 10; i++) {
                        System.out.println("t1 do working...");
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
    
            Thread t2 = new Thread(() -> {
                boolean b = lock.hasQueuedThreads();
                System.out.println("t2"+ b);
                lock.lock();
                System.out.println("t2 start working...");
                try {
                    for (int i = 0; i < 10; i++) {
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("t2 do working... ");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
            
    
            Thread t3 = new Thread(() -> {
                boolean b = lock.hasQueuedThreads();
                System.out.println("t3"+ b);
                lock.lock();
                System.out.println("t3 start working...");
                try {
                    for (int i = 0; i < 10; i++) {
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("t3 do working... ");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
    
            t1.start();
            t2.start();
            t3.start();
            t1.join();
            t2.join();
            t3.join();
            System.out.println("++++finished++++");
        }
    

    很容易看出,demo中使用了ReentrantLock来作为锁来对三个线程进行协调,确保三个线程顺序执行。使用方式也很简单:在需要保护的代码前后使用lockunlock即可。

    既然ReentrantLock能提供和synchronized一样的锁机制,那必须得看看到底这个“锁”有什么黑魔法。

    ReentrantLock和AbstractQueuedSynchronizer之加锁

    加锁其实是一个很容易理解的过程,其中我认为有点绕的是node结点之间链的摘除和建立,毕竟数据结构的基础还是比较弱,稍微多绕几圈就被整蒙圈了。
    在研究AQS锁实现之前得聊一下什么是“公平”和“非公平”锁。所谓公平锁遵循先来的先获得锁,翻译成白话就是大家都是在排队的;而非公平锁则反之,只要有获取锁的机会,那就不顾一切去抢,不排队。

    ReentrantLock默认的实现为非公平锁。理论上来说非公平锁比公平锁效率更高。当然也可以通过指定参数来区分是否使用公平锁。

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    // ReentrantLock中的lock方法其实
    public void lock() {
        sync.lock();
    }
    
    static final class NonfairSync extends Sync {
        //...
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
        //...
    }
    

    NonfairSync作为内部类继承自Sync,而Sync继承自AbstractQueuedSynchronizer
    说白了其实就是个模版方法,AQS提供基础实现,子类根据自己需要去自定义不同的逻辑。

    接下来根据demo中的几个关于锁的基本操作(lock)来看看其实现细节。

    首先lock方法中的compareAndSetState(0, 1)语义是如果当前的值为0,那就更新为1.这是一个基于cpu指令的原子操作。

        /**
         * The synchronization state.
         */
        private volatile int state;
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
        
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    

    如果更新成功,那就返回true。而这个原子更新的字段为AQS的state。这个字段简单理解为获取锁的标志,整个锁的核心都是围绕着这个字段来完成的。
    如果更新成功,那么将当前线程置为exclusiveOwnerThread。这个变量表示当前持有锁的线程。
    完整的语义即:当某个线程中的逻辑调用lock方法后,lock对象中的state字段由0更新为1,当前线程持有锁。
    那这个线程没执行完操作,还没释放掉锁,后续的线程怎么办?

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // AQS中的实现 必须得由子类重写
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // NonfairSync中的重写
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    如果在某个线程获取到了锁之后还没释放,其他线程也执行到lock方法,这时候由于lock对象中state为1,因此没办法更新,所以执行acquire逻辑。而acquire调用nonfairTryAcquire方法。
    首先获取state的值,在我们的demo中由于之前的线程没有释放掉锁,这里的c的值为1,而当前线程和lock对象中持有的线程不一样(getExclusiveOwnerThread返回之前持有锁的线程对象)因此这里直接返回false。
    当线程中执行的任务很短的时候,短到几纳秒,获取到锁的线程马上释放掉了。这个state值从1变成了0,这里其他线程就有机会再次去“争夺”一次锁,同样使用cas操作将state值从0到1,同时将当前线程置为lock对象的exclusiveOwnerThread字段。最后返回true,表示获取到了锁。
    还有一种情形,一个线程多次去lock,这里lock对象中持有的线程锁同一个线程,因此进入到current==getExclusiveOwnerThread()逻辑。做法也很简单,将state再加1即可,这个线程依旧能获取到锁。这就是所谓的可重入(Reentrant),即可以多次获取一个锁。

    tryAcquire方法返回为真时,表示当前线程成功获取到了锁,整个lock逻辑已经完成,后面的acquireQueued方法就直接忽略掉。
    这里小结一下:

    • AQS使用state变量来标记锁是否被线程获取,使用变量exclusiveOwnerThread标记获取锁的线程;
    • 锁可以被多次获取,这样的锁叫做可重入锁(Reentrant),通过state标记获取锁的次数,同理锁被获取多少次就得释放多少次,不然锁不会被释放;

    接下来看看acquireQueued方法的实现。上面说道,当尝试获取锁成功的时候,lock方法就结束了,如果尝试获取锁失败呢?如果失败就进入到acquireQueued方法:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    

    其实这里是两个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先调用addWaiter

    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    看到这里,出现了新的数据结构Node,为了更加方便理解,现在不得不对这个数据结构进行说明。

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            volatile int waitStatus;
            
            volatile Node prev;
    
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    首先是俩静态变量,这个变量仅仅是一个标记,并没有实际用途:

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    

    因为AQS有两种模式:独占和共享。独占模式例如demo中的ReentrantLock,而共享模式如并发工具包中的CountDownLatch。接着就是waitStatus变量,同时指定了几个枚举。然后就是thread当前线程,以及前驱后继结点。不难看出这是一个双端链表结构。nextWaiter字段暂时按下不表。

    继续看addWaiter方法,由于传入的mode为Node.EXCLUSIVE,因此这里创建的node的nextWaiter字段的值为null,将当前要获取锁的线程也放进node里,然后尝试去“操作”这个node。实际上就是看这个AQS中node队列除了当前创建的还有没有别的:

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    这里的tailhead都是AQS中的变量,用于操控node链表。他们的更新都是使用cas实现的,保证原子性。如果AQS中没有node链表(没有形成),head和tail都是null,直接走enq逻辑,然后将新创建的这个node返回,如果AQS中有结点存在呢,那就直接将创建的node变成tail。compareAndSetTail(pred, node)的语义为,如果当前tail的值为pred,那么将其更新为node。然后修改后继指针,返回node结点。

    再看看AQS中的结点为空的时候:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    逻辑很清晰,这里进一步判断了一下tail是否为空,如果是真为空,那就新建一个node结点作为头结点,同时将tail指向头结点。这时候头结点就是尾结点,且结点内没有数据,只是作为一个标志而已。然而并没有返回,因为是个死循环,头尾结点初始化成功之后,继续走else逻辑,同理将新创建的结点的前驱指向刚才新建的空结点,然后把tail指向自己(Node node = new Node(Thread.currentThread(), mode);)的结点。最后修改后继指针并返回。这个逻辑看起来比较绕,尤其是指针的操作让人眼花缭乱,通过画图会更容易理解。总结一句话就是:创建node链表,初始化tail和head指针,且head指针指向的是一个空node(仅仅有意义的事waitStatus=0,因为没给值,默认就是0)。而返回的新创建的node作为acquireQueued参数:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    核心逻辑又是一个死循环,首先获取刚才创建的node结点的前驱结点,如果前驱结点为head结点(空的结点),可以再给这个线程一次机会,尝试获取锁。tryAcquire之前说过,这里不赘述。如果运气好,获取到了(state从0到1)返回true,将当前结点设置为head,同时摘除链表关系,也就是那个空结点被释放了,这个时候head结点可不是空结点了,而是Node node = new Node(Thread.currentThread(), mode);创建出来的。最后返回false,获取锁成功。为什么要来这一出呢?因为如果之前获取锁的线程执行任务的时候,其他线程在尝试着排队的时候还是有机会去抢一下的,说不定哪一瞬间任务结束释放了锁其他线程刚好抢到了呢?当然这也是有前提的,当线程决定去排队,且是排第一个的时候才能有多一次机会去抢锁。这里有疑问了,这个不是非公平的么?为啥还得排第一个才能抢?其实并不矛盾,因为每个线程都至少有一次机会去抢锁,通过tryAcquire。只有没抢到的,打算排队的,排到第一个的线程有第二次机会。当然,就算某个线程排第一,多一次抢锁机会,也不一定必然抢到呀,因为别的线程依旧和这个线程一样,同样是通过tryAcquire来抢的,因此是公平的,严格来说不公平,因为排第一的线程多了一次机会。

    如果这个排第一的倒霉鬼还是没获取到锁,那就很难受了。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    这里我们先考虑前驱结点为空结点的情况,之前提到,空结点的waitStatus没有赋值,默认为0,因此这里直接走compareAndSetWaitStatus(pred, ws, Node.SIGNAL);逻辑,将其置为-1;最后返回false,然而for死循环的逻辑还没结束,还会继续尝试获取一下锁,如果还是没获取到,那就再次进入到shouldParkAfterFailedAcquire中,因为第一次循环中将其waitStatus从0设置为了-1,因此这里直接返回true,所以,当在ASQ内部中“排队”的线程数第一个,是有两次次额外的获取锁的机会的。
    接着就是parkAndCheckInterrupt逻辑了:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    park为停车的意思,这里理解为挂起也没太大毛病。说白了就等着呗,等到什么时候为止呢?那就得从unlock说起了。

    ReentrantLock和AbstractQueuedSynchronizer之释放锁

    demo中的解锁方法unlock对应的实现逻辑为release

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    类似地,先尝试去释放一下:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    先把state值减回去,判断一下当前的线程是不是AQS中锁持有的线程,如果不是那就说明有问题。如果当state还原为0了说明锁被释放掉了,同时将当前AQS持有的线程置为空。最后将当前state值更新(更新为减1后的,这里并不一定是0)。正如上文提到过,如果加多次锁,那么也得释放多次。如果没获释放掉,那就说明当前锁依旧被持有。

    如果更新state成功,那么还需要做的一件事就是处理node结点。如果AQS中的头结点不为空,且状态不是默认的初始化的0,那么就去唤醒后继结点:

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    这里逻辑也十分清晰,先更新结点的waitStatus,将其置为0.然后找到后面的结点,如果不是空,那就将其唤醒,和之前是park一一对应。这里还多了一段判断锁被取消的情况,注释中也写得很清晰,意思就是从node链表的尾部开始找,一直找到符合要求的结点将其唤醒。

    唤醒了还没完,因为等待锁的线程被park了后还得继续执行后续的逻辑。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 锁释放之后 这里会继续执行
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    parkAndCheckInterrupt返回的值为当前线程的中断状态,如果当前(获取锁的)线程被设置了中断标记,那么这个方法就直接返回true。即interrupted=true
    由于是死循环,同样的当前线程(被唤醒的)获取一下锁,因为AQS中的state已经还原了,所以这里能拿到,将当前结点设置为头结点,获取锁完成。由于可能存在当前获取锁的线程由于某种情况被设置了中断标记,那么就将其中断(也只是设置中断标记)。

    相比获取锁的操作,释放锁容易很多。

    小结

    本文基于ReentrantLock锁的基础实现,对AQS的大致原理进行了比较粗略的分析。如AQS的底层结构,核心的API等。通过锁的基础操作,如加锁和释放锁背后的逻辑进行了详细解读。当然还有很多没有涉及到的地方,如条件队列,共享模式的实现,公平和非公平的体现等。当知道了AQS的原理之后,去理解这些主题也是非常轻松的。总的来说,AQS的代码量不算太多,读起来不是很吃力。

    PS:在云笔记中发现18年的时候也写过一篇AQS的文章,现在居然一点印象都没有了,时间啊,是真残酷。2年前的笔记

    参考资料

    Java并发锁框架AQS(AbstractQueuedSynchronizer)原理从理论到源码透彻解析

    Java里一个线程调用了Thread.interrupt()到底意味着什么?

    相关文章

      网友评论

        本文标题:Java中的“锁”事之ReentrantLock

        本文链接:https://www.haomeiwen.com/subject/xqdhjktx.html