美文网首页
J.U.C之基——AQS

J.U.C之基——AQS

作者: 囧囧有神2号 | 来源:发表于2018-05-02 12:43 被阅读0次

    在J2SE 1.5的java.util.concurrent包(下称j.u.c包)中,大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer(下称AQS类)这个简单的框架来构建的;
    那么J.U.C包下的同步器主要有以下几个功能:

    • 内部同步状态的管理
    • 同步状态的更新和检查操作
    • 且至少有一个方法会导致调用线程在同步状态被获取时阻塞,以及在
      其他线程改变这个同步状态时解除线程的阻塞

    而AQS就实现了以上功能,供其他同步器使用。
    所有同步器都有两个基本方法,acquire,release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。(不用同步器命名不同Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get...)

    之前提过Synchronized内置锁,JVM对其进行了许多优化,其性能已经比ReentrentLock更好,但是常规的JVM锁优化策略并不适用于严重依赖于J.U.C包的典型多线程服务端应用。
    大部分情况下,特别在同步器有竞争的情况下,稳定地保证其效率才是J.U.C包的主要目标。

    同步器的acquire与release

    acquire

    while(同步状态不允许acquire){
            放入队列  if  没有进队;
            依具体需求来决定是否阻塞当前线程;
    }
    出队 if 已入对;
    

    release

    更新同步状态;
    if(状态允许被足阻塞线程acquire){
          解除一个或多个队列里的阻塞线程;
    }
    

    要实现上述功能需要三个基本组建的相互协作:

    • 同步状态的原子性管理
    • 线程的阻塞与解除阻塞
    • 队列的管理

    同步状态
    AQS用单个32位int值来保存同步状态
    阻塞
    LockSupport.park阻塞LockSupport.unpark解阻塞

    LockSupport.park()
    LockSupport.park(Object)
    LockSupport.parkNanos(Object, long)
    LockSupport.parkNanos(long)
    LockSupport.parkUntil(Object, long)
    LockSupport.parkUntil(long)
    LockSupport.unpark(Thread)
    

    队列
    在AQS中采用CHL列表来解决有序的队列的问题。

    CLH队列
    一个新的节点node,通过原子操作入队:
    do{
          pred = tail;
    } while(!tail.compareAndSet(pred, node);
    

    每一个节点的释放状态都保存在前驱节点中:

    while (pred.status != RELEASED);
    

    自旋后的出队操作:

    head = node;
    

    优点在于其入队和出队操作是快速、无锁的,以及无障碍的(即使在竞争下,某个线程总会赢得一次插入机会而能继续执行);且探测是否有线程正在等待也很快(只要测试一下head是否与tail相等);同时,“释放”状态是分散的,避免了一些不必要的内存竞争。

    AQS

    public abstract class AbstractQueuedSynchronizer extends
        AbstractOwnableSynchronizer implements java.io.Serializable { 
        //等待队列的头节点
        private transient volatile Node head;
        //等待队列的尾节点
        private transient volatile Node tail;
        //同步状态
        private volatile int state;
        ......
        static final class Node {
    //表示共享模式,如CountDownLatch
            static final Node SHARED = new Node();
    //表示独占模式,如ReentrentLoack
            static final Node EXCLUSIVE = null;
    //节点操作因为超时或者对应的线程被interrupt。节点不应该留在此状态,
    //一旦达到此状态将从CHL队列中踢出。
            static final int CANCELLED =  1;
    //等待触发,即节点为SIGNAL ,后继节点会挂起
            static final int SIGNAL    = -1;
    //表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
            static final int CONDITION = -2;
    //状态需要向后传播
            static final int PROPAGATE = -3;
    // 初始状态为0,新生的非CONDITION节点都是此状态。
            volatile int waitStatus;
    //此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
            volatile Node prev;
    //此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
            volatile Node next;
    //节点绑定的线程
            volatile Thread thread;
    //标记当前节点的模式是共享还是独占
            Node nextWaiter;
            ...
        }
    
    }
    
    AQS.Node

    实现原理

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //在节点加入队列过程中若线程被中断,则会调用该方法,底层调用interrupt()
                selfInterrupt();  
        }
    

    需要子类重写tryAcquire与tryRelease方法利用CAS来修改同步状态status;
    多线程下各个线程都会尝试修改状态,如果可以修改则tryAcquire返回true,acquire直接返回;若不能修改,放进队列;

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    addWaiter方法建一个当前线程的Node放入队列,值得注意的是利用了CAS来完成插入操作;
    接下来总的逻辑是:放入队列后,会检查前一个节点的状态,前一个节点状态为SIGNAL则挂起当前线程通过LockSupport.park(this);
    来看看acquireQueued的实现

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    for循环里检验前一个节点的状态,为啥要用无限循环呢?为防止其被意外唤醒。
    当前一个节点为head时说明该轮到它了,再次尝试tryAcquire;
    没轮到它的时候,会执行shouldParkAfterFailedAcquire,该方法只有在节点状态为SIGNAL返回true,CANCELLED则删除节点,其它情况就用CAS将状态改为SIGNAL;
    来看看shouldParkAfterFailedAcquire方法实现:

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    shouldParkAfterFailedAcquire返回true,则会调用parkAndCheckInterrupt将线程挂起,被唤醒后根据线程中断标记来返回boolean值;

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted(); //会清除标记位
        }
    

    看代码,假设一种情况,A之前一直挂起,现在轮到A了,也就是前一个节点是head,head已经释放了同步状态,与是唤醒A,A在acquireQueued方法for循环中苏醒尝试tryAcquire,但是这时一个新的线程先一步执行acquire方法,先于A 执行tryAcquire,A又得挂起;由此可看出这是非公平的,有线程插队

    Release

    子类需要实现tryRelease;
    重置同步状态位0,唤醒后继节点;在这里也有被插队的可能,因为同步状态已经归零。

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                //这里h.waitStatus == 0的唯一可能是head没有后继节点
                //waitStatus默认为0,后继节点加入队列后将前一个节点waitStatus设为SIGNAL
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    //唤醒后继节点
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    总结

    举例说明变化情况:现在有一个线程N正在执行,阻塞队列为空,A线程tryAcquire失败,addWaiter创建了一个空的Node作为Head,它的next指向A线程的Node,随后进入acquireQueued,再次尝试tryAcquire因为线程可能已经执行完了,失败调用shouldParkAfterFailedAcquire,此时将头Head的同步状态由0变为SIGNAL返回false,回到acquireQueued里再次循环,还是尝试再次tryAcquire,失败调用shouldParkAfterFailedAcquire,由于Head的waitStatus为SIGNAL返回true,进入parkAndCheckInterrupt,将A线程阻塞;若又一线程B也失败,它将会将A 的waitStatus变为SIGNAL,排在A的后面阻塞着;
    执行的线程完成了,release释放同步状态,唤醒阻塞队列里的线程;接着上面的逻辑,首先N线程tryRelease成功(存在被插队的可能),取出head节点执行unparkSuccessor,将head节点waitStatus重置为0,取出head.next也就是A,LockSupport.unpark唤醒A线程,逻辑回到了A阻塞的地方也就是acquireQueued的for循环里,再次尝试tryAcquire(在这里可能被插队),成功,将A设为head,将原先为空的head的next指针清除以便GC回收;

    以ReentrantLock为例:

    image.png

    相关文章

      网友评论

          本文标题:J.U.C之基——AQS

          本文链接:https://www.haomeiwen.com/subject/rxlqrftx.html