美文网首页
2021-04-11_AQS锁互斥源码学习笔记总结

2021-04-11_AQS锁互斥源码学习笔记总结

作者: kikop | 来源:发表于2021-04-11 23:06 被阅读0次

20210411_AQS锁互斥源码学习笔记总结

1概述

AQS是一个用来构建锁和同步器的框架,Lock包中的锁(ReentrantLock独占模式、ReadWriteLock)、Semaphore共享模式、CoundDownLoatch、Jdk之前的FutureTask等均基于AQS来构建。

本文基于源码进行相关知识点进行总结。

1.1主要知识点

  1. 基于NoFaire非公平、重入锁ReentrantLock,模拟3个线程,第一个线程比较耗时。

  2. 后续2个线程首先尝试获取锁

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    2.1.如果tryAcquire获取锁成功,则执行业务处理。

    2.2.如果tryAcquire获取锁失败,则创建独占模式的Node节点会进入行FIFO双向队列,即addWaiter。然后走基类AQS中的acquireQueued(注意加到队列中的节点都是按顺序去获取锁,判断是否是头结点)。

    2.3.如果是当前节点的前驱节点为head,则有一次机会再次尝试获取锁tryAcquire,如果获取锁成功,则执行业务处理。

    否则,并Park阻塞。

    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
  1. 等待第一个线程执行完毕,会通知unPark同步器中的队列首个线程节点。

  2. 加锁lock源码分析。

  3. 解锁unlock源码分析。

image-20210411204836987.png

AQS数据结构图

image-20210411230021227.png

2代码示例

package com.kikop.myjuclockstudy.myaqs.myreentrantlock;

import com.kikop.util2.MyDateUtil;
import com.kikop.util2.RandomUtil;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author kikop
 * @version 1.0
 * @project Name: technicalskill
 * @file Name: AtomicDemoTest
 * @desc 功能描述 ReentrantLock:默认非公平独占锁、可重入锁、独占锁、可中断锁
 * @date 2020/6/7
 * @time 17:47
 * @by IDE: IntelliJ IDEA
 */
public class MyReenLockSimpleTest {

    // static存存在jvm元数据区
    // 加入FIFO队列策略:谁先加入,完全由Cpu时间片切换决定
    // FIFO队列节点:谁是第一个头节点先执行:判断前面是否有头结点
    private static Lock lock = new ReentrantLock(); // ReentrantLock:默认非公平独占锁、可重入锁、独占锁、可中断锁

    // 线程个数
    private static int THREAD_COUNT = 3;

    // 临界区资源
    private static int globalVar = 0;


    public static void inc() {

        try {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申请锁,即将加入FIFO队列...");
            lock.lock();  // 加锁
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申请锁-->获得锁成功!");

            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":开始业务逻辑执行.");
            String currentThreadName = Thread.currentThread().getName();
            if ("T1".equalsIgnoreCase(currentThreadName)) { // 模拟第一个线程耗时较长时间:1分钟,后续线程将如队列
                Thread.sleep(3 * 60 * 1000);
            } else {
                Thread.sleep(RandomUtil.getSpecialRangeRandomValue(100));
            }
            globalVar++;
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":完成业务逻辑执行.");
        } catch (InterruptedException e) {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---释放锁异常!");
            e.printStackTrace();
        } finally {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---释放锁!");
            lock.unlock(); // 释放锁
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Thread[] threads = new Thread[THREAD_COUNT];

        for (int i = 0; i < THREAD_COUNT; i++) {
            threads[i] =
                    new Thread(() -> {
                        inc();
                    }, String.format("T%s", i + 1)
                    );
        }

        for (int i = 0; i < THREAD_COUNT; i++) {
            threads[i].start();
            Thread.sleep(100);
        }

        TimeUnit.MINUTES.sleep(30);
        System.out.println("Result:" + globalVar);
    }
}

3NoFair源码分析

3.1AQS同步器初始化

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

3.2双向FIFO队列结构

首先,当T2,T3线程入队列后,(小技巧:断点加到 lock.unlock() 上一行),Sync同步器节点结构如下图:

image-20210411192219099.png

当前执行线程exclusiveOwnerThread为:T1,state=1表示当前锁被使用中。

image-20210411192507543.png

查看此时的head节点(prev=null,waitStatus=-1,持有线程:null)

image-20210411192723536.png

查看此时的head节点的next节点(prev=null,waitStatus=-1,持有线程:T2)

image-20210411192814560.png

查看此时的head节点的next节点(prev=T2持有,waitStatus=-1,持有线程:T3)

image-20210411192553272.png

查看此时的tail节点(prev=T2持有,waitStatus=0,持有线程:T3)

3.3加锁lock源码分析

image-20210411230105933.png
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1); // 获取锁lock()失败,非阻塞等待,释放CPU资源,执行 acquire(1)
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

3.3.1acquire

// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    // 1.再次尝试获取锁 tryAcquire
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // acquireQueued::for (;;) {,这里将阻塞,等待 unPark唤醒 
        // 2.尝试获取锁失败, 则addWaiter,将节点加到FIFO队列( 默认独占节点)
        // 3.acquireQueued
        selfInterrupt(); // 4.获取锁成功,线程进行自我中断
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.3.1.1addWaiter

// AbstractQueuedSynchronizer.java

/**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
private transient volatile Node tail;

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); // 首次入队初始化 init(),先构建一个FIFO空节点
    return node; // 返回当前New节点
}
3.3.1.1.1enq

tail为null时,表示首次,需完成 FIFO链表的初始化

第二次将参数:Node节点入队列。

private Node enq(final Node node) {
    for (;;) { // 无限循环
        Node t = tail;
        if (t == null) { // 第一次循环,Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 第二次循环
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

3.3.1.2acquireQueued

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // FIFO队列按顺序,否则节点变化太频繁,
                // 判断node的头节点是否为head,若果是head且获取锁成功,则设置head=node
                setHead(node);
                p.next = null; // 断开head,help GC
                failed = false;
                return interrupted;
            }
            // FailedAcquire 未获取到锁
            // 将pred前一节点 waitStatus由默认值改为 Node.SIGNAL
            // 修改完成后则 parkAndCheckInterrupt
            // 等待某个时刻被唤醒后,唤醒后执行 Thread.interrupted,表示线程被中断唤醒(同时清除标志位)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
3.1.1.2.1shouldParkAfterFailedAcquire
// 该值默认0从达到小,CANCELLED SIGNAL CONDITION PROPAGATE
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

volatile int waitStatus;

这里我们需重点分析一下 队列中除tail节点 pred Node waitStatus的流转流程,该值默认0-->-1,表示后面节点需要唤醒。

// init:
// pred:前驱节点,init head节点:0
// node:当前节点:0
// exec result:
// pred:前驱节点,init head节点:0
// node:当前节点:0(最后一个节点都是0)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 设置头节点waitStatus:-1,unlock时会用到,具体看3.2节
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
3.1.1.2.2parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
// 判断当前线程是否被中断,并且清除中断标志位
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

3.1.1.3selfInterrupt

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

3.4解锁unlock源码分析

image-20210411230148669.png
public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) { // 释放锁成功(本质就是修改标志位)
        Node h = head; // 每次都是从head节点开始遍历
        if (h != null && h.waitStatus != 0) // h.waitStatus=-1
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3.4.1tryRelease

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // c==0,表示可以释放锁了
    if (Thread.currentThread() != getExclusiveOwnerThread()) // 不能释放别的线程的锁
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null); // 清空 AQS.exclusiveThread
    }
    setState(c); //设置AQS.state=0
    return free;
}
3.4.1.1.1unparkSuccessor

T1线程业务处理完成,唤醒后继节点,这里即T2线程。

// node为当前AQS的head头节点
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 头结点 waitStatus当前为:-1
    if (ws < 0)  // 设置head Node waitStatus:-1--> 0,表示后继节点唤醒流程引导完成
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next; // waitStatus 初始化完成后,基本上都是 -1 -1 0。
    if (s == null || s.waitStatus > 0) { // 防止节点线程自我取消了。
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 这里唤醒线程 AQS 中的第一个非空节点T2
        LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread); // 回到:3.1.1.2.2,即return Thread.interrupted();
}

4总结

4.1公平与非公平锁本质区别分析

公平与非公平锁本质就是在调用基类AQS中的acquire方法内部tryAcquire方法,会根据子类AQS子类Sync、NonfairSync去调用不同的实现。

// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

公平锁:tryAcquire:hasQueuedPredecessors

// FairSync
final void lock() {
    acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

4.2锁同步

AQS数据结构为FIFO双向链表。

条件变量Condition的wait,signal等价于Jdk原生对象Object的wait,Notify,NotifyAll。

ConditionObject可构建多个等待队列,Lock(同步队列Q1)和Condition(条件等待队列Q2),其实就是两个队列的互相移动。

相关文章

  • 2021-04-11_AQS锁互斥源码学习笔记总结

    20210411_AQS锁互斥源码学习笔记总结 1概述 AQS是一个用来构建锁和同步器的框架,Lock包中的锁(R...

  • 2022-05-06_JavaLockSupport示例互斥锁学

    20220506_JavaLockSupport示例互斥锁学习笔记.md 1概述 1.1LockSupport L...

  • 画分布式锁之"通文馆圣主"Curator的&

    上一篇,我们基于示例和源码去剖析了可重入互斥锁,不可重入互斥锁,信号量锁,参见【画分布式锁之"通文馆圣主"C...

  • 6.数据库

    Android数据库ORM框架用法、源码和性能比较分析 synchronized与lock 对象锁、互斥锁、共享锁...

  • (七)golang 互斥量 源码分析

    互斥锁 源码位置:https://github.com/golang/go/blob/master/src/syn...

  • 线程同步与互斥

    Linux--线程编程 多线程编程-互斥锁 线程同步与互斥 互斥锁 信号量 条件变量 互斥锁 互斥锁的基本使用...

  • go RWMutex源码解析

    RWMutex 基于go 1.13源码总的来说读写锁就是利用互斥锁和CAS维护2个关于读锁的变量以及runtime...

  • Golang 锁的相关知识

    Golang锁分类:互斥锁(Mutex)、读写锁(RWMutex)。 互斥锁 在编写代码中引入了对象互斥锁的概念,...

  • iOS底层探索-多线程锁

    多线程的锁大致可分为两大类:互斥锁、自旋锁;也可以分为三类:互斥锁、自旋锁、读写锁。 一、互斥锁:互斥+同步(强调...

  • go-锁机制

    Golang中的锁机制主要包含互斥锁和读写锁 互斥锁 互斥锁是一种简单的加锁的方法来控制对共享资源的访问,互斥锁只...

网友评论

      本文标题:2021-04-11_AQS锁互斥源码学习笔记总结

      本文链接:https://www.haomeiwen.com/subject/bpzzkltx.html