美文网首页
线程并发--AQS抽象队列同步器

线程并发--AQS抽象队列同步器

作者: 叩丁狼教育 | 来源:发表于2019-01-04 11:06 被阅读94次

    本文作者:黄海燕,叩丁狼高级讲师。原创文章,转载请注明出处。

    AQS队列同步器英文全称AbstractQueuedSynchronizer,这是一个抽象类,为什么我们今天需要学习这个抽象类呢?这个抽象类它的神奇之处到底是什么呢?我们一起来掀开它的神奇面纱吧!

    什么是AQS(AbstractQueuedSynchronizer)?

    AQS关系.png

    AQS中文翻译为同步器,Lock接口的实现类基本都是通过包含AQS子类对象来完成线程访问控制的。比如说Lock中的获取锁和释放锁操作。说白了就是一把抽象的锁。

    Lock通过调用AQS子类的lock方法实现获取锁,unlock方法实现释放锁。

    lock方法.png unlock方法.png

    1.1 AQS源码分析

    1.1.1类字段分析:

        private transient volatile Node head;
        private transient volatile Node tail;
        private volatile int state;
    
        protected final int getState() {
            return state;
        }
    
        protected final void setState(int newState) {
            state = newState;
        }
    
        protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    1)我们可以看到字段中其实最主要的就是一个volatile 修饰的state(状态),state用于标记锁状态。

    2)compareAndSetState方法是使用CAS算法修改state保证state修改时的数据安全。

    3)head和tail表示队列的头和尾,也就是锁该类将线程和线程状态封装成一个节点Node存入到同步队列中,结构如下:

    叩丁狼教育.png

    AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

    1.1.2方法分析:

    叩丁狼教育.png

    AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

    tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

    tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

    1.1.3锁类型分析:

    叩丁狼教育.png

    1)公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2)非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。

    注意:ReentrantLock初始化默认为非公平锁,非公平锁减少了线程的挂起和恢复运行效率高于公平锁。公平锁可以解决饥饿发生。

    1.1.4获取锁操作分析:

    final void lock() {//获取锁
        if (compareAndSetState(0, 1))//获取锁成功
            setExclusiveOwnerThread(Thread.currentThread());//独占锁
        else
            acquire(1);//其他线程尝试获取锁
    }
    
    
    acquire****方法:
    叩丁狼教育.png

    1)tryAcquire()尝试直接去获取资源,如果成功则直接返回;

    2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

    3)acquireQueued()****方法;重新竞争同步锁

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//自旋
                final Node p = node.predecessor();//获取当前节点的上一个节点
                if (p == head && tryAcquire(arg)) {//上一节点为头节点和获取尝试获取锁成功
                    setHead(node);//上一个节点已经获取到锁,将当前阶段设置为头节点,下一次就由自己获取锁
                    p.next = null; // 头节点获取到锁离开队列
                    failed = false;
                    return interrupted;//返回中断状态
                }
                 //不是头节点或者获取不到锁
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    
    
    shouldParkAfterFailedAcquire****方法:
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)//如果上一节点被唤醒,当前节点就进入等待
            return true;
        if (ws > 0) {//如果上一节点线程被取消了
            do {
                node.prev = pred = pred.prev;//当前阶段尝试跳过上一节点,插个队
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//尝试将上一个节点设置为唤醒状态
        }
        return false;
    }
    
    parkAndCheckInterrupt****方法:
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//线程进入等待状态
        return Thread.interrupted();//线程唤醒后,检查自己是否真的处于中断状态,注意:interrupted会清除中断状态
    }
    

    1)selfInterrupt(),自我中断,当前线程调用了interrupt方法

    流程图如下:

    叩丁狼教育.png

    16.1.5 释放锁操作分析

    release****方法:
    public final boolean release(int arg) {
        if (tryRelease(arg)) {//尝试释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)//头节点不为null并且等待状态不是为没有状态
                unparkSuccessor(h);//唤醒头节点的后面一个节点
            return true;
        }
        return false;
    }
    
    tryRelease****方法:
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;//重入锁state大于1,需要调用两次unlock方法,释放两次
        if (Thread.currentThread() != getExclusiveOwnerThread())//如果独占锁不是当前需要释放的线程,抛出异常
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);//独占锁设置为null,释放锁
        }
        setState(c);
        return free;
    }
    
    unparkSuccessor****方法:
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)//等待状态,不是被取消的状态
            compareAndSetWaitStatus(node, ws, 0);//CAS修改回到最初的状态,没有等待的状态下
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//各种等待情况下
            s = null;
               //从队尾开始找不为空和不是当前节点的节点,直到找到当前节点的后面一个节点位置
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)//如果t为等待状态下
                    s = t;//s设置为t,拿到t这个节点
        }
        if (s != null)//唤醒当前节点的下一个节点
            LockSupport.unpark(s.thread);
    }
    

    想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

    相关文章

      网友评论

          本文标题:线程并发--AQS抽象队列同步器

          本文链接:https://www.haomeiwen.com/subject/scdhrqtx.html