美文网首页java收藏
AQS - ReenTrantLock “加锁”源码分析

AQS - ReenTrantLock “加锁”源码分析

作者: Burlong | 来源:发表于2021-12-20 11:00 被阅读0次

    加锁操作(公平、独占锁情形下)

    假设有T1、T2、T3三个线程按 T1 → T2 → T3 的先后顺序来抢锁:

    • T1 → 通过修改state=1 获取锁成功
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
        // T1只会在第一个判断之后返回
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    
    // java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
        // 1、确保当前节点是CHL队列中的第一个线程
        // 2、cas操作将state改为1
        // 3、将exclusiveOwnerThread指向当前线程,标记当前持有锁的线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    • T2 → CAS修改state=1失败(获取锁失败) → 构建双向链表队列:Head(Thread为null,waitState=-1)、tail(Thread为T2,waitState=0)→ 进入park阻塞
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
            // tryAcquire: T2在第一个判断后发现拿不到锁(tryAcquire返回false)
            // addWaiter: 则调用addWaiter尝试入队
            // acquireQueued: 入队完成后,再次去尝试拿锁,如果拿到锁则出队,拿不到则park(阻塞)。此处注意acquireQueued返回值:如果是true代表被中断过,那么需要调用selfInterrupt中断当前线程??
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 由于是这个同步器第一次入队,因此pred(tail)肯定为空,那么T2会直接到这,去创建一个队列并入队
        enq(node);
        return node;
    }
    
    // 很经典的一个线程安全的双向链表构建范式!
    // 特点:只有一处返回,即队列构建完成(构建队列的动作也可能在中间被其他线程给完成)并入队成功才返回head节点
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 先构建一个虚拟节点作为head节点,cas设置head节点
                if (compareAndSetHead(new Node()))
                    // 如果成功了,将head和tail同时指向这个虚拟节点
                    tail = head;
            } else {
                // 说明队列已经构建完成,则cas尝试让当前节点入队(尾插)
                // 也就是将上一个tail作为当前节点的prev节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 入队成功后,将上一个tail的下个节点指向当前节点,完成入队
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点就是head并且获取锁成功(tryAcquire方法前面已分析),则执行当前节点出队操作
                if (p == head && tryAcquire(arg)) {
                    // 下面操作旨在将当前节点置为一个dummy node供head节点引用
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 拿不到锁,则
                // 1、shouldParkAfterFailedAcquire:判断前驱节点waitStatus是否为-1,否则先改前驱节点的waitStatus为-1,在第二次进入这段逻辑判断则会返回true,进入parkAndCheckInterrupt
                // 2、parkAndCheckInterrupt:将当前线程挂起
                // (由于LockSupport.park(Thread)方法可被中断唤醒,因此如果当前线程被中断唤醒了,那么当前方法最终返回值则为true)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 这段比较复杂,待分析
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    • T3 → CAS修改state=1失败(获取锁失败) → 入队:构建一个节点(Thread为T3,waitState=0),修改T2所在的节点(即tail节点)的waitState=-1,再将tail指向当前新建的节点 → 进入park阻塞
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
    public final void acquire(int arg) {
        // T3此处与T2唯一的不同点在于入队操作时不需要再创建队列,只需创建节点尝试入队
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
        }
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 由于T2已创建了队列,则tail不为空,那么尝试入队
            // 可以发现下面这块代码和java.util.concurrent.locks.AbstractQueuedSynchronizer#enq的for循环中else分支基本一模一样,也就是尾插入队操作
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    // T3除了在入队操作上不需再创建队列之外,其他的代码逻辑跟T2完全相同
    

    相关文章

      网友评论

        本文标题:AQS - ReenTrantLock “加锁”源码分析

        本文链接:https://www.haomeiwen.com/subject/lwaafrtx.html