4、AQS

作者: 神秘空指针 | 来源:发表于2018-09-26 20:18 被阅读0次

AQS 介绍

AQS 全名AbstractQueuedSynchronizer,是所有同步器的基础。它维护了一个状态值state,这个状态值本身不代表任何含义。根据使用的它的子类自己定义。并且提供了几个基础方法

  1. acquire(int arg) 独占模式 -获取共享状态
  2. release(int arg) 独占模式 -释放共享状态
  3. acquireShared(int arg) 共享模式 -获取共享状态
  4. releaseShared(int arg) 共享模式 -释放共享状态

共享状态在AQS 中就是state,在每个子类中的含义不同,所以独占模式和共享模式都需要子类自己去实现。比如ReentrantLock中
state 大于1 代表当前对象锁被重入次数,下面我们结合原码来看一下两种模式的用法。

1、独占模式 -获取同步状态

/**
 * 独占模式 不响应线程的interrupts
 */
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//私有中断处理,不抛出异常
}

这段代码中tryAcquire 在AQS 中默认是没有实现的,需要子类自己去实现。这里分为三个步骤:

  1. tryAcquire 尝试获取锁
  2. 获取锁失败,addWaiter 创建一个等待节点。 节点属性中 EXCLUSIVE 是独占模式标志 SHARE 是共享模式标志
  3. acquireQueued 尝试将节点线程添加到等待队列中,并将线程挂起。

1、node属性

/**
 * 队列节点 字段
 */
static final class Node {
        //节点状态
        volatile int waitStatus;
        //前一个节点
        volatile Node prev;
        // 后一个节点
        volatile Node next;
        //节点线程
        volatile Thread thread;
        //先一个等待者,从代码中看是标志下一个是 共享节点还是独占节点。目前不知道有什么作用
        Node nextWaiter;
}

2、将节点添加到双向链表中去

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // node 已经初始化了,直接将当前node添加到tail
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //没有初始化,先初始化在设置到tail
    enq(node);
    return node;
}
/**
 * 链表没有初始化,忙循环 加 CAS 确保设置head tail 成功。
 */
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

3、线程加入等待队列acquireQueued,这里挂起线程

/**
 * 1、每一个线程都需要不断自我检测自己的前驱节点是不是头节点
 * 2、如果前驱节点是头节点,则要去尝试获取锁。这里就可以保证获取锁的顺序与加入队列的顺序保持一致。保证了锁的公平性
 * 3、不是头节点,检查前驱节点状态,SIGNAL 态 则挂起当前线程,否则先将前驱节点设置为SIGNAL态在挂起当前线程。 
 */
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//死循环
                final Node p = node.predecessor();
                /**
                 * 前驱节点是头节点,就尝试去获取锁
                 * 成功就设置为head节点,并返回
                 */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取锁失败,检查是否可以挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) //确保失败后,节点从队列中移除
                cancelAcquire(node);
        }
}

    /**
     *  检查是否可以挂起当前线程
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //SIGNAL 是等待唤醒状态
        if (ws == Node.SIGNAL)
            return true;
        // 大于0 只有取消状态,取消状态的直接跳过
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
             //节点默认状态是 0,这里给前节点状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

看了以上代码以后我们看看流程图


image.png

到这里获取共享状态的原理差不多说清楚了,接下来看看释放共享状态的原理。

2、独占模式 -释放同步状态

释放同步状态比较简单,之前说过,获取同步状态失败的线程会被添加到等待队列并挂起。那么同步状态的释放就是相反的操作。
找到头节点的下一个节点,用LockSupport.uppark(Thread) 方法唤醒线程即可。具体代码如下:

//释放同步状态
public final boolean release(int arg) {
        //tryRelease 的具体实现交给其子类
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒下一节点
            return true;
        }
        return false;
    }

private void unparkSuccessor(Node node) {
      
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //找到下一个在等待状态的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程
    }

3、共享模式

共享模式基本原理和独占模式是相同的。二者的区别在于独占模式在一个时刻只能一个线程成功获取同步状态,而共享模式可以多个线程同时获取同步状态。

//尝试获取共享锁
 public final void acquireShared(int arg) {
         //tryAcquireShared() 返回值说明一下 
         // 1、大于0 获取共享锁成功,并且后续也可能成功
         // 2、等于0 获取共享锁成功,后续的不会成功
         //3、小于0 获取共享锁失败 
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
 }

//忙循环 获取共享锁
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { //头节点就尝试获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//设置头节点并传播(唤醒下一个节点)
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //这个和独占锁相同 失败挂起锁等待 唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

共享模式没有什么特别要说明的地方,基本原理还是相同的。

当然除了以上说到的方法,AQS 还提供了其他的方法。放出来大家了解一下

//与acquire(int arg) 相同,不过次方法支持线程中断
acquireInterruptibly(int arg)

//与acquire(int arg) 相同,不过次方法支持线程中断并且支持等待超时时间
tryAcquireNanos(int arg, long nanosTimeout)

//与acquireShared(int arg) 相同,不过次方法支持线程中断
acquireSharedInterruptibly(int arg)

//与acquireShared(int arg) 相同,不过次方法支持线程中断并且支持等待超时时间
tryAcquireSharedNanos(int arg, long nanosTimeout)

最后总结一下AQS

AQS 内部维护了一个state(称为同步状态),一个FIFO的同步队列。对这个state的操作都是原子的(通过CAS)。当线程获取同步状态
失败(意味着竞争),AQS 将这些线程放入FIFO队列中,并且线程本身通过忙循环不断检测前驱节点的状态,来判断自己是应该挂起线程还是去争取锁。只有获取到锁或者线程中断才会从忙循环中退出。释放同步状态相对简单一些,把头节点的下一节点唤醒去争取锁即可。

参考文章:https://juejin.im/entry/5ae02a7c6fb9a07ac76e7b70

相关文章

网友评论

      本文标题:4、AQS

      本文链接:https://www.haomeiwen.com/subject/fnlinftx.html