美文网首页
AQS源码分析

AQS源码分析

作者: HannahLi_9f1c | 来源:发表于2019-04-14 16:05 被阅读0次

一、AQS是什么

AQS(AbstratctQueuedSynchronizer)是一个抽象类的同步器,使用了模板方法的设计模式,也就是在抽象类AQS中定义算法的骨架,然后具体的子类需要重写AQS的一些方法,这样就能实现不同场景所需要的同步器,java中的ReetrantLock,ReentrantReadWriteLock等很多锁都是基于AQS实现的。AQS简单来说就是有一个当前工作线程并且维护一个被锁定的线程队列,并且实现独占锁或者是共享锁。
大概原理是这样的


image

二、AQS怎么使用

1. 模板方法

  • 模板方法使用继承,在父类中定义了一系列算法骨架,和一些可复用的方法,子类继承父类的方法,在不同的应用场景重写算法,满足不同需求。
  • 模板方法和策略模式很像,都是在高层定义算法,底层实现具体的实现,模板方法多是用继承实现,而策略模式通过委托实现。策略模式的好处就是耦合比较低,但是AQS是用模板方法实现的,原因就是可以在模板方法定义一些公共的方法,子类可以复用,而策略模式则不能,策略模式更多体现的是可替换性,也就是不同的算法之间可替换,而不会对应用造成影响。

2. 自定义AQS

  • 了解了模板方法之后,我们就知道如何使用AQS自定义同步器了,也就是要继承AQS,然后重写它的一些方法
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

  • 独占方式重写tryAcquire和tryRelease,共享方式重写tryAquireShared和tryReleaseShared方法
  • 重写实现的主要是对资源state的占用和释放方式。
  • 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
  • 学习一下ReetrantLock的源码,我们大概就知道怎么使用了。

三、学习源码之前需要了解

1. 线程的状态

线程状态转换

image
image

当线程进入到synchronized方法或者synchronized代码块时,线程切换到的是BLOCKED状态,而使用java.util.concurrent.locks下lock进行加锁的时候线程切换的是WAITING或者TIMED_WAITING状态,因为lock会调用LockSupport的方法。

2. CAS+volatile实现线程安全

  • volatile语义可见性+有序性,但不能保证并发安全,使用场景:

(1)对变量的写操作不依赖于当前值。
(2)该变量没有包含在具有其他变量的不变式中。

  • CAS则是一种乐观锁策略,通过一条处理器级指令保证并发安全,如果没有获得锁,就会进入自旋尝试获取锁。也就是在竞争不是很大的时候的一种并发策略。
  • CAS+volatile的使用既保证了这个变量的可见性,又能在存在竞争的时候线程安全地获取资源,虽然volatile修改存在并发安全问题,但是用CAS可以避免这个问题。
  • 至于什么时候不用synchronized而是CAS+volatile非阻塞乐观锁?应该在synchronized的阻塞造成了系统性能的瓶颈的时候,因为CAS+volatile的编写更加复杂

三、AQS源码分析

1. 数据结构

        static final Node SHARED = new Node();//共享模式的标记
        static final Node EXCLUSIVE = null;//独占模式的标志
        static final int CANCELLED =  1;//表明线程已经取消
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        //一种线程等待状态,需要工作线程唤醒
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;//一种等待状态,表明线程有条件性等待
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:状态域,包括以下值
         *   SIGNAL:     * 这个节点的继任者被阻塞(通过park),所以当前线程当释放或者取消的时候必须要唤醒它的继任者
         为了避免竞争,获取方法需要先表明signal,然后原子性获取。失败之后进入bolck状态
         *   CANCELLED: 
         这个节点由于超时或者中断被取消。节点不释放它的资源。尤其,一个个被取消的节点不会再一次blocks
         *   CONDITION:  
         这个节点当前在一个条件等待队列
         *   PROPAGATE: 
         非负表明节点不需要唤醒。所以,大多数代码不需要检查特点值,只是用来标记
         
         这个参数初始化为0
         condition会初始化为CONDITION,通过CAS修改值,或者无条件地volatile写**/
        volatile int waitStatus;

       
         /*当前工作线程的前继线程
         由于head线程总是不取消,所以一个节点只能通过aquire获取。一个取消的线程不能aquire。一个线程总是自我取消,而不是其他节点来取消*/
        volatile Node prev;

      
        volatile Node next;

    
         /*入队的线程.构造函数初始化,使用后置为null*/
        volatile Thread thread;
        Node nextWaiter;
  • waitStatus表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始化状态。
/**
     等待队列头节点,除了初始化,通过setHead修改。注意:如果头存在,它的等待状态不能是CANCELLED.
     */
    private transient volatile Node head;

    /**
     尾节点,通过enq方法添加新的等待节点
     */
    private transient volatile Node tail;

    /**
     * 资源state.
     */
    private volatile int state;
  • AQS维护着一个队列,维护着共享资源的访问如下


    image

    state的有三种访问方式

    /**
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  • 那么它为什么有两种方式修改state呢,我觉得可能是使用场景不同吧

2. 独占模式下的获取acquire

2.1. acquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们可以看到这个入口函数是final修饰的,说明设计的时候不希望子类修改这个方法,它先是用tryAcquire获取,如果获取成功就返回;addWaiter将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

2.2 tryAquire

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

这个方法通过protected修饰,只有子类访问,也就是在创建同步器的时候自定义获取方式。这个AQS是抽象类,所以只抛出异常

2.3 addWaiter

  private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这个方法先尝试快速入队,如果成功就返回,否则通过enq入队

  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq主要是通过自旋一直尝试将节点插入尾部,所以可以在快速尝试失败后调用。这个就是CAS自旋volatile的经典使用

2.4 acquiredQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
                    //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

由于尝试获取资源失败,于是进行等待队列等待别的线程结束唤醒自己。如果自己的前任节点是head,并且获取成功,返回;不然就在waiting中一直等待

2.5

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;//如果已经设置了前结点释放资源后会唤醒它,那么就可以返回休息了
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
             /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。     */
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

不是SIGNAL,需要加入等待队列等待,同时尝试下看有没有机会轮到自己

2.6 总结

流程如下


image

3. 独占模式下的release

3.1 release

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

直接利用tryRelease释放线程,因为独占模式下只有head线程占有资源,因此直接释放即可。然后要唤醒一个等待线程

3.2 tryRealse

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

这个方法也是需要在子类重写的,父类直接抛出异常。重写的时候如果释放资源成功返回true,否则返回false

3.3 unparkSuccessor

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //头节点的线程状态<0,那么利用cas把它状态设置为0
        if (ws < 0)
           compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //从tail往前遍历,寻找最前面需要唤醒的节点,然后unpark唤醒
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

这个函数就是寻找继任线程,并将它唤醒

4. acquireShared

  • 然后我们继续研究非独占式的获取共享资源和释放共享资源

4.1 acquireShared

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

通过tryAcquiredShared获取资源,如果没有获取到,执行doAcquireShared

4.2 tryAcquireShared

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

类似于tryacquire,抽象类中直接抛出异常

4.3 doAcquireShared

 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        //共享节点加到队列
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            //获取node的前继节点
                final Node p = node.predecessor();
                if (p == head) {
                如果前继是head,看能不能获取资源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       //如果还有资源,要唤醒下一个等待节点 setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                20             //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()

                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个函数试图获取资源,并且将head设置为signal,表示等他释放资源了就能唤醒它的next。这里的资源只能是head.next获取,体现了公平性
4.4 setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//node设置为head
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //释放head的资源
                doReleaseShared();
        }
    }

4.5 总结
这个非独占式的获取跟acquire区别不是很大,无非就是线程拿到资源之后如果发现资源有剩余,会唤醒下一个正在等待的线程,而且唤醒的是临近等待的线程,体现了公平性,但是性能会差一点

5. releaseShared

  • 这个跟独占模式下的释放区别不大,主要也是释放资源,唤醒线程。区别主要在于独占模式需要state=0之后才唤醒其他线程,而非独占模式只要剩余资源就回去唤醒线程

相关文章

网友评论

      本文标题:AQS源码分析

      本文链接:https://www.haomeiwen.com/subject/hvfjwqtx.html