美文网首页
J.U.C之基——AQS

J.U.C之基——AQS

作者: 囧囧有神2号 | 来源:发表于2018-05-02 12:43 被阅读0次

在J2SE 1.5的java.util.concurrent包(下称j.u.c包)中,大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer(下称AQS类)这个简单的框架来构建的;
那么J.U.C包下的同步器主要有以下几个功能:

  • 内部同步状态的管理
  • 同步状态的更新和检查操作
  • 且至少有一个方法会导致调用线程在同步状态被获取时阻塞,以及在
    其他线程改变这个同步状态时解除线程的阻塞

而AQS就实现了以上功能,供其他同步器使用。
所有同步器都有两个基本方法,acquire,release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。(不用同步器命名不同Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get...)

之前提过Synchronized内置锁,JVM对其进行了许多优化,其性能已经比ReentrentLock更好,但是常规的JVM锁优化策略并不适用于严重依赖于J.U.C包的典型多线程服务端应用。
大部分情况下,特别在同步器有竞争的情况下,稳定地保证其效率才是J.U.C包的主要目标。

同步器的acquire与release

acquire

while(同步状态不允许acquire){
        放入队列  if  没有进队;
        依具体需求来决定是否阻塞当前线程;
}
出队 if 已入对;

release

更新同步状态;
if(状态允许被足阻塞线程acquire){
      解除一个或多个队列里的阻塞线程;
}

要实现上述功能需要三个基本组建的相互协作:

  • 同步状态的原子性管理
  • 线程的阻塞与解除阻塞
  • 队列的管理

同步状态
AQS用单个32位int值来保存同步状态
阻塞
LockSupport.park阻塞LockSupport.unpark解阻塞

LockSupport.park()
LockSupport.park(Object)
LockSupport.parkNanos(Object, long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(Object, long)
LockSupport.parkUntil(long)
LockSupport.unpark(Thread)

队列
在AQS中采用CHL列表来解决有序的队列的问题。

CLH队列
一个新的节点node,通过原子操作入队:
do{
      pred = tail;
} while(!tail.compareAndSet(pred, node);

每一个节点的释放状态都保存在前驱节点中:

while (pred.status != RELEASED);

自旋后的出队操作:

head = node;

优点在于其入队和出队操作是快速、无锁的,以及无障碍的(即使在竞争下,某个线程总会赢得一次插入机会而能继续执行);且探测是否有线程正在等待也很快(只要测试一下head是否与tail相等);同时,“释放”状态是分散的,避免了一些不必要的内存竞争。

AQS

public abstract class AbstractQueuedSynchronizer extends
    AbstractOwnableSynchronizer implements java.io.Serializable { 
    //等待队列的头节点
    private transient volatile Node head;
    //等待队列的尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
    ......
    static final class Node {
//表示共享模式,如CountDownLatch
        static final Node SHARED = new Node();
//表示独占模式,如ReentrentLoack
        static final Node EXCLUSIVE = null;
//节点操作因为超时或者对应的线程被interrupt。节点不应该留在此状态,
//一旦达到此状态将从CHL队列中踢出。
        static final int CANCELLED =  1;
//等待触发,即节点为SIGNAL ,后继节点会挂起
        static final int SIGNAL    = -1;
//表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
        static final int CONDITION = -2;
//状态需要向后传播
        static final int PROPAGATE = -3;
// 初始状态为0,新生的非CONDITION节点都是此状态。
        volatile int waitStatus;
//此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
        volatile Node prev;
//此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
        volatile Node next;
//节点绑定的线程
        volatile Thread thread;
//标记当前节点的模式是共享还是独占
        Node nextWaiter;
        ...
    }

}
AQS.Node

实现原理

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//在节点加入队列过程中若线程被中断,则会调用该方法,底层调用interrupt()
            selfInterrupt();  
    }

需要子类重写tryAcquire与tryRelease方法利用CAS来修改同步状态status;
多线程下各个线程都会尝试修改状态,如果可以修改则tryAcquire返回true,acquire直接返回;若不能修改,放进队列;

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter方法建一个当前线程的Node放入队列,值得注意的是利用了CAS来完成插入操作;
接下来总的逻辑是:放入队列后,会检查前一个节点的状态,前一个节点状态为SIGNAL则挂起当前线程通过LockSupport.park(this);
来看看acquireQueued的实现

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

for循环里检验前一个节点的状态,为啥要用无限循环呢?为防止其被意外唤醒。
当前一个节点为head时说明该轮到它了,再次尝试tryAcquire;
没轮到它的时候,会执行shouldParkAfterFailedAcquire,该方法只有在节点状态为SIGNAL返回true,CANCELLED则删除节点,其它情况就用CAS将状态改为SIGNAL;
来看看shouldParkAfterFailedAcquire方法实现:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire返回true,则会调用parkAndCheckInterrupt将线程挂起,被唤醒后根据线程中断标记来返回boolean值;

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted(); //会清除标记位
    }

看代码,假设一种情况,A之前一直挂起,现在轮到A了,也就是前一个节点是head,head已经释放了同步状态,与是唤醒A,A在acquireQueued方法for循环中苏醒尝试tryAcquire,但是这时一个新的线程先一步执行acquire方法,先于A 执行tryAcquire,A又得挂起;由此可看出这是非公平的,有线程插队

Release

子类需要实现tryRelease;
重置同步状态位0,唤醒后继节点;在这里也有被插队的可能,因为同步状态已经归零。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //这里h.waitStatus == 0的唯一可能是head没有后继节点
            //waitStatus默认为0,后继节点加入队列后将前一个节点waitStatus设为SIGNAL
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
//唤醒后继节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结

举例说明变化情况:现在有一个线程N正在执行,阻塞队列为空,A线程tryAcquire失败,addWaiter创建了一个空的Node作为Head,它的next指向A线程的Node,随后进入acquireQueued,再次尝试tryAcquire因为线程可能已经执行完了,失败调用shouldParkAfterFailedAcquire,此时将头Head的同步状态由0变为SIGNAL返回false,回到acquireQueued里再次循环,还是尝试再次tryAcquire,失败调用shouldParkAfterFailedAcquire,由于Head的waitStatus为SIGNAL返回true,进入parkAndCheckInterrupt,将A线程阻塞;若又一线程B也失败,它将会将A 的waitStatus变为SIGNAL,排在A的后面阻塞着;
执行的线程完成了,release释放同步状态,唤醒阻塞队列里的线程;接着上面的逻辑,首先N线程tryRelease成功(存在被插队的可能),取出head节点执行unparkSuccessor,将head节点waitStatus重置为0,取出head.next也就是A,LockSupport.unpark唤醒A线程,逻辑回到了A阻塞的地方也就是acquireQueued的for循环里,再次尝试tryAcquire(在这里可能被插队),成功,将A设为head,将原先为空的head的next指针清除以便GC回收;

以ReentrantLock为例:

image.png

相关文章

网友评论

      本文标题:J.U.C之基——AQS

      本文链接:https://www.haomeiwen.com/subject/rxlqrftx.html