美文网首页
AQS(AbstractQueuedSynchronizer)

AQS(AbstractQueuedSynchronizer)

作者: 程序员札记 | 来源:发表于2022-03-23 08:40 被阅读0次

AbstractQueuedSynchronizer(AQS)是JDK中实现并发编程的核心,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。

如何保证并发

AbstractQueuedSynchronizer 维护了一个state(代表了共享资源)和一个FIFO线程等待队列(多线程竞争资源被阻塞时会将线程放入此队列)。
由于state是由volatie修饰的所以该变量的改动都是立等可见的。


image.png

注意这个state 是个integer ,就意味着不同的数值可以表达不同的状态,而不是简单的0 或者1.
1.共享资源状态
private volatile int state;

  1. 操作共享资源状态操作方法
// 读取该值
protected final int getState() {
    return state;
}

// 更新该值 线程不安全
// 当独享该状态时使用该方法更加快捷,节省计算资源

protected final void setState(int newState) {
    state = newState;
}

// 自旋更新该值 线程安全
// 当竞争修改该状态时可用该方法
protected final boolean compareAndSetState(int expect, int update) {
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

如何争夺资源

AQS 定义了两种资源共享的方式 Exclusive(独占,一时间只有一个线程能访问该资源)、Share (共享,一时间可以有多个线程访问资源).

  • 独占: 假设state初始状态为0,表示未锁定状态。线程A想使用该资源就把state修改为了1,那么线程B来访问资源时发现state是1并不是0他就会被AQS送入等待队列,
    直到线程A将该资源设置为0。
  • 共享:假设state初始状态为N,当有线程来访问后N就减少1个,直到N=0 这时就会阻塞新的线程来访问资源。当某一个线程执行完毕后会将state+1,相当于释放了该线程持有的锁。这样新的线程就可以继续访问该资源。

独占模式就像共享单车一时间只有一个人可以骑这个共享单车,共享模式就像公交车可以上去很多人,但是人一旦上满了就不能在上人了,必须要等车上的人下来后才能继续上人。

获取独占资源

独占获取资源( acquire(int i) )

此方法是独占模式下线程获取共享资源的入口。如果获取到了资源,线程直接返回,否则进入等待队列,直到获取到资源为止,而且个过程忽略中断的影响,这就是锁定的意义。获取到资源就可以去执行锁定范围内的代码了。

public final void acquire(int arg) {
    // 由于&&的短路特性 获取到权限后 后面的等待队列等一系列功能将不再执行
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法执行步骤:

  • tryAcquire() 尝试直接去获取资源,如果成功则让当前执行线程继续执行。就不需要添加等待队列了。
  • addWaiter() 将该线程加入等待队列的尾部,并且标记为独占模式。
  • acquireQueued() 使线程在等待队列中获取资源,一直到获取到资源了才继续执行,如果在整个等待过程中被中断,返回true,否则返回false。
  • selfInterrupt() 如果线程获取到资源后,发现线程被中断过则立即将线程中断。

tryAcquire() 直接获取资源

// 独占模式下尝试获取资源 成功则返回true
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

我们可以看到!tryAcquire(arg)是个取反的操作,也就是只有直接无法获取当前资源时才执行下一步。
然而AQS并没有提供实现方式。具体实现方式需要继承该框架的子类去实现。至于具体如何才能获取锁就是用户自定义的事情了。
至于为什么没有写成abstract。是为了如果不用该方法也不需要重写该方法。
在上一章的例子中,我给了一个具体实现。使用CAS方式获取state的权限。如果获取到了将当前线程放入独占执行线程中。否则视为没有抢到资源。

/**
 * 尝试获取锁
 */
@Override
protected boolean tryAcquire(int arg) {
    // 使用CAS方式修改状态。
    // 修改成功则继续执行线程
    // 线程会阻塞在这里等待。
    if(compareAndSetState(0,1)){
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    // 否则不能执行线程
    return false;
}

addWaiter()加入等待队列

将当前线程节点放入队列然后返回当前节点信息。

队列与节点的具体情况参考

private Node addWaiter(Node mode) {

    // 构造出一个新的等待线程节点。

Node node = new Node(Thread.currentThread(), mode);

    // 获取一下队列尾部的节点,如果节点存在则直接将当前节点挂接在尾部

    Node pred = tail;

    if (pred != null) {

        // 将当前尾部节点设置为前驱节点。

        node.prev = pred;

        // CAS 方式更新尾部节点。

        if (compareAndSetTail(pred, node)) {

            // 将原尾部节点的下一个节点设置为node

            // 双向链表设计

            pred.next = node;

            return node;

        }

    }

    // 如果挂载队尾节点也存在竞争 则使用无限CAS自旋方式设置队尾。

    enq(node);

    return node;

}

// 自旋方式挂载尾节点

private Node enq(final Node node) {

    for (;;) {

        Node t = tail;

        // 当尾节点不存在时 头节点也不存在

        // 该条件只可能是在尚未有队列的时候创建队列的第一个node时会触发。

        if (t == null) {

            // CAS方式更新头节点创建一个新的头节点 将头节点和为节点都指向创建的新节点。

            // 由于是CAS方式只会有一个线程会创建成功。一旦有头节点就可以继续接入下个节点了

            if (compareAndSetHead(new Node()))

                tail = head;

} else {

            // 如果存在尾节点则执行过程和之前快速插入队列的逻辑相似。

            // 更新尾巴节点 挂接双向列表。

            node.prev = t;

            if (compareAndSetTail(t, node)) {

                t.next = node;

                return t;

            }

        }

    }

}

将节点添加到尾部流程:

  1. 依据当前线程和线程模型创建一个新的节点。

  2. 尝试将当前节点直接挂在尾节点上。

  3. 如果尾节点不存在则头节点也不存在,自旋方式创建一个尾节点和头节点。如果在过程中尾节点已经被添加,这继续将该节点挂载在尾节点上,

具体流程入下图:


image.png

acquireQueued()等待休息直到其他线程唤醒

当通过tryAcquire()方法获取线程失败后,使用addWaiter()将该线程放入队列的尾部。然后进入等待状态,知道其他线程释放资源后唤醒此线程,自己在拿到资源,然后就能干自己的事情了。这就和买好了票排到了队尾等待前面的人一个一个完成任务后轮到自己。实际上该方法一直在自旋(发呆)直到自己变成了头结点,如果变成头结点后该线程已经中断则中断该线程的执行。

final boolean acquireQueued(final Node node, int arg) {
    // 标记自己是否拿到了资源 true为没有获取到。
    boolean failed = true;
    try {
        // 标记等待过程中是否被中断过
        boolean interrupted = false;
        // 开始自旋
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 如果自己的前置节点是头结点,并且自己获取到了资源权限
            if (p == head && tryAcquire(arg)) {
                // 将自己设置为头结点
                setHead(node);
                // 将前置节点的后续节点强连接断开
                // 相当于自己已经是头结点了不需要前置节点了。帮助GC回收垃圾
                p.next = null;
                //标记已经获取到节点
                failed = false;
                // 判断该线程是否被中断过。
                return interrupted;
            }
            // 这两个方式是检查状态、和让线程休息 下面会详细讲解这两个方法的作用
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此方法用于检查状态,看看自己是否可以休息了。
为了更好理解我重复下上一章的知识点,AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 检查前置节点的装填
   int ws = pred.waitStatus;
   // 如果前置节点的状态为 等待唤醒的后置节点,这放心休息。
   if (ws == Node.SIGNAL)
       return true;
    // 大于0表示该节点已经被中断,或者已经结束,
   if (ws > 0) {
     // 如果前置节点不是正常的等待状态那么就继续往前找直到找到一个正在等待装填的节点。将其后置节点断开接上当前节点。GC会回收一堆相互引用又没有外部引用的节点。
       do {
           node.prev = pred = pred.prev;
       } while (pred.waitStatus > 0);
       pred.next = node;
   } else {
        // 如果该节点是其他状态就将其修改为等待状态 主要照顾共享节点
       compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;
}

次方法就是让线程去等待。如果线程被中断过则返回true

private final boolean parkAndCheckInterrupt() {
    // 调用park让线程进入wait状态
    LockSupport.park(this);
    // 检查线程是否中断过。
    return Thread.interrupted();
}

acquireQueued()的执行流程如下。

  • 调节点进入队列尾巴后,检查状态,找到安全的等待位置等待。
  • 调调用park()让线程进入wait状态,等待unPark()或者interrput()唤醒。
  • 调被唤醒后,看自己是不是有资格拿到资源。如果拿到,将head指向当前节点。并返回从入队到拿到的整个过程中是否被中断过。如果没有则回到1号流程循环执行。

具体流程入下图:


image.png

selfInterrupt()中断该线程的执行

如果线程在等待过程中被中断过,那么获取到资源后才会通知线程中断。

//自助中断该线程。
private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

acquire()竞争获取资源流程总结

  • 调用用户自定义的同步器tryAcquire()去尝试获取资源,如果成功则直接进入临界区执行代码。
  • 调没有获取到资源则将该线程组装成一个结点放入队列尾部分,并且标记为独占模式。
  • 调使线程在队列中休息,唤醒时会被unPark()会尝试获取资源。获取到资源后才返回.如果在整个等待过程中被中断过则返回true 否则返回false.
  • 调如果线程在等待过程中被中断过,他是不会响应的,只是在获取到资源后直接调用自我中断,将中断线程。
    流程图如下


    image.png

相关文章

网友评论

      本文标题:AQS(AbstractQueuedSynchronizer)

      本文链接:https://www.haomeiwen.com/subject/roeqjrtx.html