美文网首页
2021-04-11_AQS锁互斥源码学习笔记总结

2021-04-11_AQS锁互斥源码学习笔记总结

作者: kikop | 来源:发表于2021-04-11 23:06 被阅读0次

    20210411_AQS锁互斥源码学习笔记总结

    1概述

    AQS是一个用来构建锁和同步器的框架,Lock包中的锁(ReentrantLock独占模式、ReadWriteLock)、Semaphore共享模式、CoundDownLoatch、Jdk之前的FutureTask等均基于AQS来构建。

    本文基于源码进行相关知识点进行总结。

    1.1主要知识点

    1. 基于NoFaire非公平、重入锁ReentrantLock,模拟3个线程,第一个线程比较耗时。

    2. 后续2个线程首先尝试获取锁

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      

      2.1.如果tryAcquire获取锁成功,则执行业务处理。

      2.2.如果tryAcquire获取锁失败,则创建独占模式的Node节点会进入行FIFO双向队列,即addWaiter。然后走基类AQS中的acquireQueued(注意加到队列中的节点都是按顺序去获取锁,判断是否是头结点)。

      2.3.如果是当前节点的前驱节点为head,则有一次机会再次尝试获取锁tryAcquire,如果获取锁成功,则执行业务处理。

      否则,并Park阻塞。

      // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              for (;;) {
                  final Node p = node.predecessor();
                  if (p == head && tryAcquire(arg)) {
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      
    1. 等待第一个线程执行完毕,会通知unPark同步器中的队列首个线程节点。

    2. 加锁lock源码分析。

    3. 解锁unlock源码分析。

    image-20210411204836987.png

    AQS数据结构图

    image-20210411230021227.png

    2代码示例

    package com.kikop.myjuclockstudy.myaqs.myreentrantlock;
    
    import com.kikop.util2.MyDateUtil;
    import com.kikop.util2.RandomUtil;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: technicalskill
     * @file Name: AtomicDemoTest
     * @desc 功能描述 ReentrantLock:默认非公平独占锁、可重入锁、独占锁、可中断锁
     * @date 2020/6/7
     * @time 17:47
     * @by IDE: IntelliJ IDEA
     */
    public class MyReenLockSimpleTest {
    
        // static存存在jvm元数据区
        // 加入FIFO队列策略:谁先加入,完全由Cpu时间片切换决定
        // FIFO队列节点:谁是第一个头节点先执行:判断前面是否有头结点
        private static Lock lock = new ReentrantLock(); // ReentrantLock:默认非公平独占锁、可重入锁、独占锁、可中断锁
    
        // 线程个数
        private static int THREAD_COUNT = 3;
    
        // 临界区资源
        private static int globalVar = 0;
    
    
        public static void inc() {
    
            try {
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申请锁,即将加入FIFO队列...");
                lock.lock();  // 加锁
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申请锁-->获得锁成功!");
    
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":开始业务逻辑执行.");
                String currentThreadName = Thread.currentThread().getName();
                if ("T1".equalsIgnoreCase(currentThreadName)) { // 模拟第一个线程耗时较长时间:1分钟,后续线程将如队列
                    Thread.sleep(3 * 60 * 1000);
                } else {
                    Thread.sleep(RandomUtil.getSpecialRangeRandomValue(100));
                }
                globalVar++;
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":完成业务逻辑执行.");
            } catch (InterruptedException e) {
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---释放锁异常!");
                e.printStackTrace();
            } finally {
                System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---释放锁!");
                lock.unlock(); // 释放锁
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread[] threads = new Thread[THREAD_COUNT];
    
            for (int i = 0; i < THREAD_COUNT; i++) {
                threads[i] =
                        new Thread(() -> {
                            inc();
                        }, String.format("T%s", i + 1)
                        );
            }
    
            for (int i = 0; i < THREAD_COUNT; i++) {
                threads[i].start();
                Thread.sleep(100);
            }
    
            TimeUnit.MINUTES.sleep(30);
            System.out.println("Result:" + globalVar);
        }
    }
    

    3NoFair源码分析

    3.1AQS同步器初始化

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
    
        } catch (Exception ex) { throw new Error(ex); }
    }
    

    3.2双向FIFO队列结构

    首先,当T2,T3线程入队列后,(小技巧:断点加到 lock.unlock() 上一行),Sync同步器节点结构如下图:

    image-20210411192219099.png

    当前执行线程exclusiveOwnerThread为:T1,state=1表示当前锁被使用中。

    image-20210411192507543.png

    查看此时的head节点(prev=null,waitStatus=-1,持有线程:null)

    image-20210411192723536.png

    查看此时的head节点的next节点(prev=null,waitStatus=-1,持有线程:T2)

    image-20210411192814560.png

    查看此时的head节点的next节点(prev=T2持有,waitStatus=-1,持有线程:T3)

    image-20210411192553272.png

    查看此时的tail节点(prev=T2持有,waitStatus=0,持有线程:T3)

    3.3加锁lock源码分析

    image-20210411230105933.png
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); // 获取锁lock()失败,非阻塞等待,释放CPU资源,执行 acquire(1)
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    

    3.3.1acquire

    // AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
        // 1.再次尝试获取锁 tryAcquire
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // acquireQueued::for (;;) {,这里将阻塞,等待 unPark唤醒 
            // 2.尝试获取锁失败, 则addWaiter,将节点加到FIFO队列( 默认独占节点)
            // 3.acquireQueued
            selfInterrupt(); // 4.获取锁成功,线程进行自我中断
    }
    
    // NonfairSync
    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    // FairSync
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    3.3.1.1addWaiter

    // AbstractQueuedSynchronizer.java
    
    /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
    private transient volatile Node tail;
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); // 首次入队初始化 init(),先构建一个FIFO空节点
        return node; // 返回当前New节点
    }
    
    3.3.1.1.1enq

    tail为null时,表示首次,需完成 FIFO链表的初始化

    第二次将参数:Node节点入队列。

    private Node enq(final Node node) {
        for (;;) { // 无限循环
            Node t = tail;
            if (t == null) { // 第一次循环,Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { // 第二次循环
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    3.3.1.2acquireQueued

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // FIFO队列按顺序,否则节点变化太频繁,
                    // 判断node的头节点是否为head,若果是head且获取锁成功,则设置head=node
                    setHead(node);
                    p.next = null; // 断开head,help GC
                    failed = false;
                    return interrupted;
                }
                // FailedAcquire 未获取到锁
                // 将pred前一节点 waitStatus由默认值改为 Node.SIGNAL
                // 修改完成后则 parkAndCheckInterrupt
                // 等待某个时刻被唤醒后,唤醒后执行 Thread.interrupted,表示线程被中断唤醒(同时清除标志位)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    3.1.1.2.1shouldParkAfterFailedAcquire
    // 该值默认0从达到小,CANCELLED SIGNAL CONDITION PROPAGATE
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    
    volatile int waitStatus;
    

    这里我们需重点分析一下 队列中除tail节点 pred Node waitStatus的流转流程,该值默认0-->-1,表示后面节点需要唤醒。

    // init:
    // pred:前驱节点,init head节点:0
    // node:当前节点:0
    // exec result:
    // pred:前驱节点,init head节点:0
    // node:当前节点:0(最后一个节点都是0)
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 设置头节点waitStatus:-1,unlock时会用到,具体看3.2节
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    3.1.1.2.2parkAndCheckInterrupt
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    // 判断当前线程是否被中断,并且清除中断标志位
    public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }
    

    3.1.1.3selfInterrupt

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
    
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    

    3.4解锁unlock源码分析

    image-20210411230148669.png
    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 释放锁成功(本质就是修改标志位)
            Node h = head; // 每次都是从head节点开始遍历
            if (h != null && h.waitStatus != 0) // h.waitStatus=-1
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    3.4.1tryRelease

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // c==0,表示可以释放锁了
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 不能释放别的线程的锁
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null); // 清空 AQS.exclusiveThread
        }
        setState(c); //设置AQS.state=0
        return free;
    }
    
    3.4.1.1.1unparkSuccessor

    T1线程业务处理完成,唤醒后继节点,这里即T2线程。

    // node为当前AQS的head头节点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; // 头结点 waitStatus当前为:-1
        if (ws < 0)  // 设置head Node waitStatus:-1--> 0,表示后继节点唤醒流程引导完成
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // waitStatus 初始化完成后,基本上都是 -1 -1 0。
        if (s == null || s.waitStatus > 0) { // 防止节点线程自我取消了。
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) // 这里唤醒线程 AQS 中的第一个非空节点T2
            LockSupport.unpark(s.thread);
    }
    
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread); // 回到:3.1.1.2.2,即return Thread.interrupted();
    }
    

    4总结

    4.1公平与非公平锁本质区别分析

    公平与非公平锁本质就是在调用基类AQS中的acquire方法内部tryAcquire方法,会根据子类AQS子类Sync、NonfairSync去调用不同的实现。

    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    公平锁:tryAcquire:hasQueuedPredecessors

    // FairSync
    final void lock() {
        acquire(1);
    }
    
    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    非公平锁:

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    4.2锁同步

    AQS数据结构为FIFO双向链表。

    条件变量Condition的wait,signal等价于Jdk原生对象Object的wait,Notify,NotifyAll。

    ConditionObject可构建多个等待队列,Lock(同步队列Q1)和Condition(条件等待队列Q2),其实就是两个队列的互相移动。

    相关文章

      网友评论

          本文标题:2021-04-11_AQS锁互斥源码学习笔记总结

          本文链接:https://www.haomeiwen.com/subject/bpzzkltx.html