美文网首页
ReentrantLock 简单源码分析

ReentrantLock 简单源码分析

作者: 听歌闭麦开始自闭 | 来源:发表于2019-04-02 17:59 被阅读0次

    基于Java 8进行源码分析

    1.ReentrantLock

    1.1. lock()

    // 非公平锁
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    // 公平锁
    final void lock() {
        acquire(1);
    }
    

    1.2. acquire(int)

    这个方法是一个综合操作,目的是为了让线程获取到锁。大致逻辑:
    1.尝试让当前线程获取锁,如果成功直接返回,如果失败就执行下一步。
    2.将没有获取到锁的线程封装为节点存储在阻塞队列中,新来的都呆在队列的后面。
    3.如果当前线程位于head.next的位置,尝试获取锁,获取到就直接返回,如果获取不到就和队列中其他位置的线程一样进入阻塞状态。
    4.当阻塞的线程被唤醒时(例如unlock()被调用),然后再次重复上一步。(unlock()只会让一个线程退出阻塞状态,不会导致惊群)

    其中还有一些对中断逻辑的处理
    *一般情况下,如果线程进入阻塞状态前,中断标志就已经为true,那么中断的逻辑在进入阻塞状态后就会被触发(因为只有进入Safepoint,线程才能感知到中断标志的变化)。
    *那么在lock()方法的调用过程中,如果对队列中没能获取到获取到锁的线程进行中断的话,在它进入阻塞状态后,AQS会感知到这个操作并暂时清除中断标志,在目标线程成功获取到锁后,AQS会恢复中断标志为true,方便用户自己处理中断的逻辑。
    *AQS暂时清除中断标志的原因是为了能让AQS内的自选逻辑继续执行,这是lock()的执行逻辑;如果使用的是lockInterruptibly()的话,在这个暂时清除中断标志的逻辑中,会直接抛出中断的异常。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter里,设置的默认waitStatus为null
            selfInterrupt();
    }
    

    1.2.1. tryAcquire(int)

    tryAcquire(int)方法的逻辑比较简单,主要行为是尝试一次获取锁,公平锁和非公平锁的区别在于:
    公平锁在获取锁之前会看看有没有线程比它等待时间久;非公平锁则是直接尝试获取。

    // 非公平锁的实现
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {  // 差别点
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 这是对重入的操作
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    
    // 公平锁的实现
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {  // 差别点
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    1.2.2. addWaiter(Node.EXCLUSIVE)

    这个方法的目的是:对于没/不能插队的lock操作,将它对应的线程添加到AQS的阻塞队列中,等待合适的执行时机。

    其中一共有两个行为: (阻塞队列是一个双向链表)
    1.初始阻塞队列
    这里会创建一个既是head又是tail的空内容节点
    2.向阻塞队列尾部链接新节点

    // AbstractQueuedSynchronizer#acquire(int) 中的代码片段
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    
    // AbstractQueuedSynchronizer
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); // 新建一个节点,当前发起lock操作的线程作为新节点
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果尾部节点不为null,把新建的节点链接到队列当前尾部节点后面。
        if (pred != null) {
            node.prev = pred;
            // 尝试完成双向链接行为。
            // 通过CAS操作保证并发时的结果正确,如果CAS操作失败会进入下面的enq(node)方法。
            // 代表节点由于并发问题,没能成功添加到队列中。
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 通过自旋,把当前节点添加到阻塞队列的尾部
        enq(node);
        return node;
    }
    
    // AbstractQueuedSynchronizer
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail; // tail是被volatile修饰的
            if (t == null) { // Must initialize
                // 初始化队列,把一个空内容的Node作为队列的head和tail
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 再次尝试将当前新建的节点链接到队列当前尾部节点后面。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t; // 这里返回的是当前节点的前一个节点
                }
            }
        }
    }
    

    1.2.3. acquireQueued(Node, int)

    1.让队列中位于head.next的线程尝试获取锁。
    2.确认当前节点是不是应该进入阻塞状态
    如果在第1步中的节点获取到了锁,那这个节点就不用走第2步。
    对于获取到锁的线程,会把这个线程对应的节点作为哨兵,变相相当于把它清除出阻塞队列了。

    Thread 的 interruptinterruptedisinterrupted
    interrupt: 设置当前线程的中断标志为true,不会立即执行中断,只有在Safepoint时线程才会知道中断标志的值是什么。
    interrupted: 检测当前线程的中断标志,同时把值设置为false。 (这个是Thread的静态方法)
    isinterrupted: 检测当前线程的中断标志。

    LockSupport API文档
    LockSupport.park(this)的作用是让当前线程进入阻塞状态,但不会释放当前线程占有的锁资源,如果检测到中断,会立即退出阻塞状态。
    LockSupport.unpark(Thread)可以让指定线程退出阻塞状态。

    // AbstractQueuedSynchronizer
    final boolean acquireQueued(final Node node, int arg) { // 在上文中可以看出arg的值为1
        boolean failed = true; // 是否没有获取到锁,true代表没获取到
        try {
            boolean interrupted = false;
            // 下面这些行为是对每个线程都有效的,相当于节点的自旋自检
            for (;;) {
                final Node p = node.predecessor(); // 尝试获取当前节点的上一个节点,如果为null会导致抛出异常
                // 1. 如果当前节点的上一个节点是head
                // 2. 当前尝试一次获取锁 (获取到会返回true,失败会返回false)
                // 同时满足上面两点才会进入if代码块,这也意味着只有位于这个head后的节点才会尝试去获取锁,
                // 如果if判断成立,则代表当前线程成功获取到了锁。
                if (p == head && tryAcquire(arg)) {
                    setHead(node); // 把当前获取到锁的节点作为head 并 清除它的prev和thread属性值,变相相当于清除出队列了
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 确定当前线程是否应该被阻塞
                // 如果当前线程对应的等待状态为0,那下次获取锁失败后才会被阻塞,并且等待状态会被设置为-1。
                // 如果当前线程对应的等待状态为-1,那线程直接阻塞。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果进入这里,代表当前线程的中断标志为true
                    interrupted = true;
            }
        } finally {
            if (failed) // 在获取锁的过程中出现异常,导致当前线程没有获取到锁,但自旋中断了
                cancelAcquire(node); // 综合逻辑中意味着将当前节点移出阻塞队列
        }
    }
    
    // AbstractQueuedSynchronizer
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //上一个节点的等待状态,这个waitStatus和Condition的操作有关。
        int ws = pred.waitStatus;
    
        // 经过下面的处理,可以看出来这样的一个逻辑,上一个节点的等待状态决定了当前节点是否需要被阻塞。
    
    
        // 如果是SIGNAL,也就是-1的话,当前线程进入阻塞状态,类似sleep(),都不会释放线程所占有的锁资源
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 如果上一个节点的waitStatus大于0,就不断向前寻找,直至找到一个等待状态小于等于0的节点,
        // 然后在这个节点和当前节点之间建立双向链接,整个过程意味着把两者之间waitStatus > 0的节点清除出阻塞队列了。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 多线程操作下也不会出现问题,因为每个节点都只会找自己之前的第一个 waitStatus 小于等于0的数据
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 额外情况(例如为null),就把上一个节点的等待状态设置为SIGNAL,也就是-1
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    // AbstractQueuedSynchronizer
    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程,但不会释放当前线程占有的锁资源
        LockSupport.park(this);
        // 检测当前线程的中断标志,并清除该标志,如果不清空会导致下次自旋时无法进入阻塞状态。
        // 如果线程在进入阻塞状态前就已经被标记为中断状态的话,阻塞行为会立即中止执行这里的return操作。
        return Thread.interrupted();
    }
    
    waitStatus 代表含义
    PROPAGATE -3 当前节点后续的共享锁可以获取
    CONDITION -2 当前节点在condition队列中
    SIGNAL -1 当前节点的next需要unpark
    未命名 0 当前节点刚进入阻塞队列,还没有进入阻塞状态
    CANCELLED 1 当前的节点被取消

    1.3. unlock()

    让当前线程的下一位的线程退出阻塞状态,并不会引起惊群。

    public void unlock() {
        sync.release(1);  // 释放1层锁,也就是让state减1
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 尝试释放指定层锁,只有当state为0是才会进入if代码块
            Node h = head;
            if (h != null && h.waitStatus != 0) // 在本文中展示的调用链里,进这里的只有-1
                unparkSuccessor(h); // 对head.next位置上的线程进行unpark
            return true;
        }
        return false;
    }
    

    2. Condition

    这东西和Object#wait()Object#notify的用法差不多

    // 使用示例
    
    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();
        condition.await();
        condition.signal();
        condition.signalAll();
    }
    

    await()或者wait()要确保在获取到锁的代码块内编写,否则会报错。

    2.1. await()

    简单的过一遍代码,可以发现,await()会把当前线程放到条件队列中,同时会释放持有的锁资源,然后把线程阻塞住。

    // AbstractQueuedSynchronizer
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) // 对中断的先行检测
            throw new InterruptedException();
        // 把当前线程加入到条件队列中,其中waitStatus默认为-2,之前在说lock()的逻辑时,那里面存的是null
        // 条件队列不像阻塞队列,它里面是没有哨兵的。
        Node node = addConditionWaiter();
        // 释放当前线程掌握的锁资源
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) { // 如果不在阻塞队列上
            LockSupport.park(this); // 阻塞当前线程
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // ... 省略部分代码
    }
    

    2.2. signal() & signalAll()

    可以看出来这两个方法的逻辑是相似的,都是在调用时把条件队列中的节点从前到后的移动到阻塞队列中,只不过前者只移一个,后者全部移过去。
    在常规使用中,这两个方法在调用过程中并不会触发unpark方法,也就是说不会让线程退出阻塞状态,想要让线程退出阻塞状态,还得靠unlock()方法。

    // signal()
    
    // AbstractQueuedSynchronizer
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter; // 获取条件队列上的第一个元素
        if (first != null)
            doSignal(first);
    }
    
    // AbstractQueuedSynchronizer
    private void doSignal(Node first) {
        // 代码块里面的逻辑: 把条件队列中的第一个节点从条件队列中移除,并添加到阻塞队列中,
        // 并且把自己的waitStatus从-2改为0,把在阻塞队列中前一位的waitStatus改为-1
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    // AbstractQueuedSynchronizer
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 把waitStatus从-2变为0
            return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node); // 把当前的节点放入到阻塞队列中,返回值是原来的tail,不是当前节点,别搞错
        int ws = p.waitStatus;
         // 尝试把前一个节点的waitStatus修改为-1,普遍都能执行成功
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 一般情况下,不会执行if里面的代码
            LockSupport.unpark(node.thread);
        return true;
    }
    
    // signalAll()
    
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            // 代码块里面的逻辑: 把条件队列中的全部节点从条件队列中移除,并添加到阻塞队列中,
            // 并且把自己的waitStatus从-2改为0,把在阻塞队列中前一位的waitStatus改为-1
    
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    

    公平锁 & 非公平锁

    非公平锁在调用lock()方法时,会优先进行CAS操作;公平锁只有在队列中没有比它等得久的线程时才会进行CAS操作尝试获取一次锁。
    一旦线程进入到阻塞队列,那两者就没什么区别。
    非公平锁形象点描述就是:你刚来的时候可以插队,但如果没插队成功,到了等待队伍里,那就乖乖的排队就行了。

    可中断锁

    lockInterruptibly()lock()的逻辑很相似,起码核心功能代码是很相似的,多了对中断异常的抛出。
    AbstractQueuedSynchronizer#doAcquireInterruptibly(int)
    AbstractQueuedSynchronizer#acquireInterruptibly(int)
    太简单了,这里就不贴代码了

    ReentrantLock 条件

    条件队列内的节点是有序的,在唤醒的操作中,条件队列内的数据也是有序的转移到阻塞队列,所以signal或者signal能够按向条件队列内添加的顺序执行。

    相关文章

      网友评论

          本文标题:ReentrantLock 简单源码分析

          本文链接:https://www.haomeiwen.com/subject/cctnbqtx.html