美文网首页
AQS源码浅析(5)——锁方法

AQS源码浅析(5)——锁方法

作者: 墨_0b54 | 来源:发表于2022-04-07 21:10 被阅读0次

    acquireQueued条件等待方法及独占模式获取

    • 调用acquireQueued方法时,至少有一个node(就是参数中的node)在队列
    • 同时可能有【新入队的线程】或者【由中断唤醒的线程】会与【当前线程】产生竞争
    • 当前驱节点是head时,就是出队的时机
    • 获取锁失败后会调用shouldParkAfterFailedAcquire(详细见下文)方法
    • 如果退出自旋却未获得锁,说明出现了未知的异常,需要调用cancelAcquire(详细见下文)方法将当前节点取消
    final boolean acquireQueued(final Node node, int arg) { //自旋获得锁,会阻塞当前线程
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//前驱节点
                if (p == head && tryAcquire(arg)) {//前驱节点是头节点就获取锁
                    setHead(node); //成功获取锁(其他线程失败,都会阻塞),当前node出队
                    p.next = null; // help GC
                    failed = false;
                    return interrupted; //如果在竞争锁的过程中发生了中断,返回true
                }
                //线程获取锁失败后的操作
                if (shouldParkAfterFailedAcquire(p, node) &&//判断线程获取锁失败后,是否应该阻塞
                    parkAndCheckInterrupt()) //阻塞线程,并在被唤醒后检查是否被中断
                    interrupted = true;
            }
        } finally {
            if (failed) // tryAcquire发生未知的错误导致线程退出自旋,shouldParkAfterFailedAcquire、parkAndCheckInterrupt应该不会出错
                cancelAcquire(node); //取消当前线程
        }
    }
    

    shouldParkAfterFailedAcquire检查和更新获取锁失败的节点的状态

    • 是所有acquire循环中的主要控制信号,检查并更新未能获取锁的节点的状态,要求pred必须是node的前驱节点
    • 如果线程前驱节点状态是SIGNAL,那么可以安全的应该阻塞,返回 true
    • 如果线程状态为CANCELLED,去除前面所有的已取消的节点(head节点一定不是CANCELLED)返回false
    • 如果节点状态是0或者PROPAGATE的时候,将前驱状态设置为SIGNAL,那么下次循环获取锁失败后进入这个方法时,将会返回true
    • 也就是说线程如果获取锁失败,在正常情况下前驱节点都应该是SIGNAL,代表自己准备好被唤醒了
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) //pred节点是SIGNAL状态表示node应该被唤醒,当前node可以安全的阻塞,返回true
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { //跳过CANCELLED状态的前驱节点
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {//pred的waitStatus一定是0 或者 PROPAGATE,说明当前节点不需要阻塞,调用者需要重试
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //前驱节点状态设置为SIGNAL
        }
        return false;
    }
    

    cancelAcquire取消节点尝试acquire

    取消当前节点,如果节点在队列的中间就从队列中去掉,否则直接唤醒一个有效的后继节点

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        node.thread = null;//清除线程对象
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0) //跳过已取消的前驱节点
            node.prev = pred = pred.prev;
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;//不会有其他线程干扰,这一步后其他线程会跳过
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) { //如果node已经在队尾,设置尾节点
            compareAndSetNext(pred, predNext, null);//更新pred的next指针为null
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head && //确定node前驱节点不是head,说明现在node已经不在队首了
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //前驱状态小于0,或者将未取消的前驱设置为SIGNAL状态(标记后继节点需要唤醒)
                pred.thread != null) {//前驱拥有线程对象
                Node next = node.next;//if判断前驱节点不是虚节点
                if (next != null && next.waitStatus <= 0) //判断后继有效
                    compareAndSetNext(pred, predNext, next);//连接node的前驱和后继
            } else {//如果node的前驱是虚节点,唤醒一个后继
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
    

    cancelAcquire与shouldParkAfterFailedAcquire一样,都有一段跳过CANCELLED状态的代码如下:

    Node pred = node.prev;
    while (pred.waitStatus > 0) //跳过已取消的前驱节点
        node.prev = pred = pred.prev;
    

    为什么这里不需要用CAS呢?

    • 首先只有线程可以把自己的节点状态设置为CANCELLED
    • 其他的线程执行这段代码时,会直接跳过CANCELLED状态的节点
    • 所以取消的线程与其他线程不会互相干扰

    这也是cancelAcquire的逻辑:

    • 如果头部同时有好几个节点取消,那么唤醒操作交给next链靠后的取消节点即可
    • 如果在队列中间取消,那么只需要最靠后的节点连接未取消的前驱和后驱即可
    • 如果在队列尾部取消,那么只需要最靠前的节点将前驱设置为尾部即可

    doAcquireNanos和doAcquireInterruptibly

    • doAcquireNanos与acquireQueued的区别是加入了超时时间限制,超时直接取消节点
    • doAcquireInterruptibly与acquireQueued的区别是响应中断,发生中断直接取消节点并抛中断异常
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    doAcquireShared共享不可中断模式获取锁

    共享模式的acquireQueued,doAcquireShared与acquireQueued的区别是会传播唤醒后继节点

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//共享节点入队
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//代表获取成功
                        setHeadAndPropagate(node, r);//node变成头节点,传播唤醒后继
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    doAcquireSharedInterruptibly和doAcquireSharedNanos

    • doAcquireSharedNanos与doAcquireShared的区别是加入了超时时间限制,超时直接取消节点
    • doAcquireSharedInterruptibly与doAcquireShared的区别是响应中断,发生中断直接取消节点并抛中断异常
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    相关文章

      网友评论

          本文标题:AQS源码浅析(5)——锁方法

          本文链接:https://www.haomeiwen.com/subject/dhqssrtx.html