美文网首页
JAVA并发(9)——AQS介绍

JAVA并发(9)——AQS介绍

作者: alexwu59 | 来源:发表于2018-06-28 22:37 被阅读0次

    AQS概述
    AbstractQueuedSynchronizer(AQS)是一个提供基础框架,JDK提供的Lock是通过AQS框架完成,程序员也可以利用AQS实现自己的锁。以JDK提高的ReentrantLock为例,如果创建了一个ReentrantLock类的对象lock,lock对象中就包含了AQS的一个子类的实例sync。
    AQS的大致逻辑是:客户端代码在执行lock.lock()的方法的时候,当前线程会去检测AQS的锁标志(同步状态)来判断锁是否可以被持有,如果锁被其他线程占用,那么当前线程会被放到AQS的等待队列中。检测AQS所标志位,其实就是检查lock对象中sync变量中的锁标志位,AQS的等待队列也就是sync中的等待队列。
    程序员可以使用AQS提供的getState(),setState()和compareAndSetState()方法来检查和修改AQS中锁标志。程序员只需要重写如下方法:
    下面方法是排它锁

    • tryAcquire(int)
    • tryRelease(int)
      下面方法是共享锁
    • tryAcquireShared(int)
    • tryReleaseShared(int)
    • isHeldExclusively()
      AQS的其他方法都是final,不需要改变,AQS本质上不是个抽象类。

    AQS实现原理

    AQS保护了一个FIFO队列,该队列是一个双向列表,实现队列的数据结构是定义在AQS中的Node类。该队列主要保存获取锁失败的线程。AQS中的基本字段:

    //指向FIFO列表的头结点
    private transient volatile Node head;
    //指向FIFO列表的尾结点
    private transient volatile Node tail;
    /**同步状态 0表示锁可以被线程获取 1表示锁被其他线程持有
    *线程在执行lock方法的时候,就是通过判断和修改该值来申请锁
    /*
    private volatile int state;
    //该属性是AQS父类的字段,记录了独占锁的线程
    private transient Thread exclusiveOwnerThread;
    

    介绍AQS的实现原理,需要从它的某个子类开始,这里以ReentrantLock为例。首先定义一个需要同步的测试类

    class LockTest{
        Lock lock = new ReentrantLock();
        public void p(){
            try {
                lock.lock();
                while (true){
                }
            }
            finally {
                lock.unlock();
            }
        }
    }
    

    然后定义访问同步代码的线程类:

    class LockA extends Thread{
        LockTest lockTest;
        public LockA(LockTest l,String name){
            super(name);
            lockTest = l;
        }    
        @Override
        public void run() {
            lockTest.p();
        }
    }
    

    下面是测试函数:

    public static void main(String[] args) throws Exception{
            LockTest lt = new LockTest();
            LockA lockA = new LockA(lt,"A");
            LockA lockB = new LockA(lt,"B");
            LockA lockC = new LockA(lt,"C");
            LockA lockD = new LockA(lt,"D");       
            lockA.start();
            //让线程A先启动
            Thread.sleep(500);
            lockB.start();
            lockC.start();
            lockD.start();
    }
    

    用idea进行断点调试,首先调用lockA.start(),让lockA线程进入临界区,再进入临界区之前,会调用lock.lock()方法先获取锁。lock.lock()的实现如下:

    final void lock() {
                //CAS更新state状态值
                if (compareAndSetState(0, 1))
                   //由于ReenterLock是独占锁,下面的是调用AQS父类的方法去设置当前拥有独占锁的线程  setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
    }
    

    compareAndSetState是CAS操作,来判断AQS中state的状态是否为0,如果是0则更新为1,因为lockA第一个进入临界区的线程,那么它执行compareAndSetState会成功,然后就会去执行if语句的其他操作:拥有独占锁的线程。如果compareAndSetState()操作失败,那么将会去执行acquire(1),这个方法的逻辑等一下lockB线程的时候再分析。
    lockA执行完setExclusiveOwnerThread方法后,获取锁完成,开始执行临界区代码。
    下面lockB线程启动,同样调用lock.lock()方法来获取锁,由于lockA正在执行临界区代码,锁没有释放,因此lockB线程在执行compareAndSetState操作时就会失败,转而去执行acquire(1);下面是acquire方法的代码:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    

    该方法执行逻辑:
    1.如果tryAcquire方法返回false,然后去执行2;返回true,方法退出

    • addWaiter方法是向FIFO列表中增加一个等待节点,然后执行3
    • acqd如果返回true,那么执行4,否则退出

    tryAcquire方法逻辑

    tryAcquire方法是AQS提供的需要实现者重新的方法,用于以独占式获取锁,在ReenterLock中,tryAcquire的实现如下

    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
    }
    

    nonfairTryAcquire(acquires)方法实现如下:

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //再次获取AQS锁状态,如果此时之前的线程已经执行完成,lockB可以尝试调用
                //compareAndSetState方法获取锁,入股返回true那么lockB线程就不需要加入到FIFO队列
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }//如果获取锁状态不为0,然后检查当前持有锁的线程是不是当前执行的线程,
                //如果是同一个线程,那么就把AQS的锁状态值加上acquires
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                //如果compareAndSetState执行失败或者current != getExclusiveOwnerThread()
                //那么tryAcquire方法返回false,然后去执行把当前线程加入到FIFO列表中的逻辑
                return false;
            }
    

    addWaiter方法逻辑

    当tryAcquire方法返回false,然后去执行addWaiter(Node.EXCLUSIVE)方法,Node.EXCLUSIVE值为null

    /** 该值表示创建的节点是独占式的 */
    static final Node EXCLUSIVE = null;
    

    addWaiter方法的逻辑如下:

         /**
         * 为当前线程创建了一个FIFO队列的节点,并且需要给节点设置锁类型
         *
         * 参数mode:Node.EXCLUSIVE表示排他锁,Node.SHARED表示共享锁
         */
        private Node addWaiter(Node mode) {
            //用当前线程和锁类型创建一个Node对象
            Node node = new Node(Thread.currentThread(), mode);
            
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    Node是AQS的内部类,它主要包括:

            //FIFO列表等待线程的锁模式:共享锁
            static final Node SHARED = new Node();
            //FIFO列表等待线程的锁模式:独占锁
            static final Node EXCLUSIVE = null;
            //线程状态 取消执行
            static final int CANCELLED =  1;
            //线程状态 可以接受被唤起操作
            static final int SIGNAL    = -1;
            //线程状态 等待CONDITION条件
            static final int CONDITION = -2;
            //线程状态 等待CONDITION条件并且无条件传播
            static final int PROPAGATE = -3;
            //线程状态
            volatile int waitStatus;
            //前序节点
            volatile Node prev;
            //后续节点
            volatile Node next;
            //保存当前线程
            volatile Thread thread;
            //节点锁模式
            Node nextWaiter;
    
    

    由于lockB在执行的时候,head和tail都是null,因此if (pred != null) 不会执行,直接执行enq方法:

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { 
                    //如果AQS尾节点是null,说明AQS中的FIFO对里是空的,
                    //需要创建一个空的Node对象,作为FIFO的头结点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } 
                //如果tail不为null,把当前AQS的尾节点赋给新创建的node节点的前序节点,
                //然后调用CAS把新创建的node设置为AQS的尾节点
                else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
    }
    

    <font color=red>注:enq方法为AQS设置FILO的头结点和尾节点,使用CAS+死循环,直到设置成功。因为当一个线程在调用CAS设置的时候,可能另外一个线程已经调用CAS成功返回,因此会出现调用失败的,如果调用失败,需要循环重新设置。enq只有在成功设置了tail之后才会返回</font>
    由于本次调试是用多线程debug模式,因此,不会出现多个线程同时执行CAS的情况,因此当lockB执行到compareAndSetHead会成功返回,这样就创建了一个空的头结点,然后把头结点对象赋值给tail,此时FIFO队列状态如下图:


    本次循环操作完成之后,因为是循环,没有退出循环,然后再执行一次循环体,这次因为tail!=null,所以会执行else操作,使用CAS把lockB代表的node设置为tail,然后返回。操作之后的FIFO如下图:

    addWaiter方法执行完成,线程被放到FIFO列表中。下面是执行acquireQueued方法:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //获取当前线程的prev节点
                    final Node p = node.predecessor();
                    //如果p是头结点,那么让当前线程再尝试获取一次AQS节点锁状态,如果获成功,
                    //那么把当前线程所在的node设置为head,并且把node节点其prev、next,thread
                    //都变为null
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
    }
    

    根据上面源码,p是node的前序节点,根据当前程序运行状态p==head,但是tryAcquire方法执行返回false。然后执行shouldParkAfterFailedAcquire方法,该方法有两个逻辑,如果当前节点的前序节点p.waitStatus=SIGNAL,那么返回true;如果是大于0,那么需要把这些大于0的node从FIFO中移除(按照从后往前遍历的方法)并返回false。如果是0,把当前线程锁在的node节点前序节点的waitStatus设置为SIGNAL(-1),返回false。

    如果shouldParkAfterFailedAcquire返回true,然后去执行parkAndCheckInterrupt()。本测试程序,此时返回是false。此刻FIFO队列如图:

    程序返回后,接着循环开始处执行。此刻当程序重新执行到shouldParkAfterFailedAcquire方法是因为p节点的waitStatus=-1,因此返回true。然后程序接着执行parkAndCheckInterrupt()方法:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
    }
    

    调用LockSupport.park(this),当前线程会被所阻塞一直到调用LockSupport.uppark()方法唤起。

    接着执行测试程序的lockC.start();第三个线程启动。第三个线程经过上面的步骤后执行到acquireQueued()时,FIFO队列的状态如下图:

    然后执行到shouldParkAfterFailedAcquire方法,由于当前线程是lockC,那么他的前序节点是lockB,此时lockB的waitStatus=0,因此shouldParkAfterFailedAcquire执行修改lockB的waitStatus=-1的操作,然后方法返回false,然后在从循环开始处执行。然后又会执行到shouldParkAfterFailedAcquire方法,此时由于lockC的前序节点lockB的waitStatus==-1直接返回true,然后当前线程lockC执行parkAndCheckInterrupt()把自己阻塞。

    下面分析锁的释放。锁释放是持有线程执行:

    finally {
            lock.unlock();
    }
    

    unlock方法调用的sync.release(1);此方法的具体实现如下:

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
    }
    

    tryRelease()方法是在ReenterLock类中的Sync内部实现,他主要逻辑是用当前AQS的锁状态-1的差值是否与0相等,如果不等于0,那说明还需要等其他线程调用unLock(),tryRelease()返回false,这个差值计算是为了实现共享锁的使用。直到AQS的锁状态与1的差值为0,然后返回true

    protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
    }
    
    

    当该方法返回true之后,执行unparkSuccessor(h)这里参数h是FIFO列表的头结点,具体代码逻辑如下:

      private void unparkSuccessor(Node node) {
            //获取头结点的锁状态信息
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //把头结点的后继节点所存储的线程从阻塞状态中释放出来
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    

    当执行到LockSupport.unpark(s.thread)时,FIFO第一个阻塞线程会从阻塞状态中返回,也就是调用LockSupport.park()方法返回,执行后面代码,然后开始执行acquireQueued方法。此方法逻辑上面已经分析过了。
    到此,AQS基本加锁解锁已经分析完成。

    相关文章

      网友评论

          本文标题:JAVA并发(9)——AQS介绍

          本文链接:https://www.haomeiwen.com/subject/dvrbyftx.html