美文网首页
AbstractQueuedSynchronizer的原理与应用

AbstractQueuedSynchronizer的原理与应用

作者: 小心我的code | 来源:发表于2020-08-05 19:03 被阅读0次

    介绍:

    AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

    1.ReentrantLock

    1.1 ReentrantLock特性概览

    ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):


    ReentrantLock与Synchronized对比.png

    Synchronized的使用方式

    // 1.用于代码块
    synchronized (this) {}
    // 2.用于对象
    synchronized (object) {}
    // 3.用于方法
    public synchronized void test () {}
    // 4.可重入
    for (int i = 0; i < 100; i++) {
        synchronized (this) {}
    }
    

    ReentrantLock的使用方式

    public void test () throw Exception {
        // 1.初始化选择公平锁、非公平锁
        ReentrantLock lock = new ReentrantLock(true);
        // 2.可用于代码块
        lock.lock();
        try {
            try {
                // 3.支持多种加锁方式,比较灵活; 具有可重入特性
                if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
            } finally {
                // 4.手动释放锁
                lock.unlock()
            }
        } finally {
            lock.unlock();
        }
    }
    

    1.2 ReentrantLock与AQS的关联

    非公平锁源码中的加锁流程如下:

    // java.util.concurrent.locks.ReentrantLock#NonfairSync
    
    // 非公平锁
    static final class NonfairSync extends Sync {
        ...
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
            }
      ...
    }
    

    这块代码的含义为:

    • 若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
    • 若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire方法进行后续处理。

    第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:

    • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:
      (1) 将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就需要下面这种流程,也就是AQS框架的处理流程。
      (2) 存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
    • 对于问题1的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
    • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
    • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?
      带着非公平锁的这些问题,再看下公平锁源码中获锁的方式:
    // java.util.concurrent.locks.ReentrantLock#FairSync
    
    static final class FairSync extends Sync {
      ...  
        final void lock() {
            acquire(1);
        }
      ...
    }
    

    看到这块代码,我们可能会存在这种疑问:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?

    结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父类AQS中的核心方法。

    对于上边提到的问题,其实在ReentrantLock类源码中都无法解答,而这些问题的答案,都是位于Acquire方法所在的类AbstractQueuedSynchronizer中,也就是本文的核心——AQS。

    2 AQS

    首先,我们通过下面的架构图来整体了解一下AQS框架:


    AQS框架.png
    • 上图中有颜色的为Method,无颜色的为Attribution。

    • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。

    • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
      下面我们会从整体到细节,从流程到方法逐一剖析AQS框架,主要分析过程如下:


      AQS框架.png

    2.1 原理概览

    AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
    CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
    主要原理图如下:


    CLH变体队列.png

    AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

    2.2AQS与ReentrantLock的关联

    非公平锁加解锁过程


    非公平锁加解锁过程.png

    加锁:

    • 通过ReentrantLock的加锁方法Lock进行加锁操作。
    • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
    • AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
    • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

    解锁:

    • 通过ReentrantLock的解锁方法Unlock进行解锁。
    • Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
    • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
    • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。

    通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。


    image.png

    3 AQS应用

    3.1 ReentrantLock的可重入应用

    ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
    公平锁:

    // java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
    
    if (c == 0) {
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    

    非公平锁:

    // java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
    
    if (c == 0) {
        if (compareAndSetState(0, acquires)){
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    

    从上面这两段都可以看到,有一个同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

    // java.util.concurrent.locks.AbstractQueuedSynchronizer
    private volatile int state;
    

    接下来看State这个字段主要的过程:
    1.State初始化的时候为0,表示没有任何线程持有锁。
    2.当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
    3.解锁也是对这个字段-1,一直到0,此线程对锁释放。

    3.2 JUC中的应用场景

    同步工具 同步工具与AQS的关联
    ReentrantLock 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
    Semaphore 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。
    CountDownLatch 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。
    ReentrantReadWriteLock 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。
    ThreadPoolExecutor Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。

    3.3 自定义同步工具

    了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。

    public class LeeLock  {
    
        private static class Sync extends AbstractQueuedSynchronizer {
            @Override
            protected boolean tryAcquire (int arg) {
                return compareAndSetState(0, 1);
            }
    
            @Override
            protected boolean tryRelease (int arg) {
                setState(0);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively () {
                return getState() == 1;
            }
        }
        
        private Sync sync = new Sync();
        
        public void lock () {
            sync.acquire(1);
        }
        
        public void unlock () {
            sync.release(1);
        }
    }
    

    通过我们自己定义的Lock完成一定的同步功能。

    public class LeeMain {
    
        static int count = 0;
        static LeeLock leeLock = new LeeLock();
    
        public static void main (String[] args) throws InterruptedException {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run () {
                    try {
                        leeLock.lock();
                        for (int i = 0; i < 10000; i++) {
                            count++;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        leeLock.unlock();
                    }
    
                }
            };
            Thread thread1 = new Thread(runnable);
            Thread thread2 = new Thread(runnable);
            thread1.start();
            thread2.start();
            thread1.join();
            thread2.join();
            System.out.println(count);
        }
    }
    

    上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。

    参考:
    https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

    相关文章

      网友评论

          本文标题:AbstractQueuedSynchronizer的原理与应用

          本文链接:https://www.haomeiwen.com/subject/ggrwrktx.html