美文网首页君临天下-Java
ReentrantLock源码解析

ReentrantLock源码解析

作者: 老荀 | 来源:发表于2020-04-04 17:04 被阅读0次

主要成员

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }
}

两种实现

static final class NonfairSync extends Sync {
    ...
}
static final class FairSync extends Sync {
    ...
}

构造器

// 默认是非公平的实现
public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

主要方法

// 都是直接调用成员sync去实现的
public void lock() {
    sync.lock();
}

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

public void unlock() {
    sync.release(1);
}

AbstractQueuedSynchronizer

在聊Sync之前,必须得聊下AQS,几乎是所有java并发工具的基石了

// AQS的父类,持有一个线程的成员
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    protected AbstractOwnableSynchronizer() { }
    // 这个成员指向持有当前AQS的线程
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    protected AbstractQueuedSynchronizer() { }
    ...
}

等待实现的方法

AQS是个抽象类,但是已经实现了绝大部分的功能,只有部分方法必须由子类来实现

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

重要成员

AQS内部实现了一个双向链表结构

private transient volatile Node head;
private transient volatile Node tail;
// 这个字段没有在构造器里初始化所以初始化值就是0
private volatile int state;
// 这个node会持有一个线程对象
static final class Node {
        ...
    volatile int waitStatus;
       
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    ...
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

lock

非公平锁

final void lock() {
    // 第一次插队的机会,无视state直接更新
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

会调用到AQS的方法

public final void acquire(int arg) {
    // tryAcquire 是由子类实现的
    if (!tryAcquire(arg) &&
        // && 之后的方法不区分公平和非公平,ReentrantLock都是排他锁 所以是Node.EXCLUSIVE
        // addWaiter就是把自己添加到AQS的链表中
        // acquireQueued可以理解为开始排队
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 第二次插队的机会,直接使用CAS尝试去更新state,从0更新到1
        if (compareAndSetState(0, acquires)) {
            // 成功了就是获得锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 提前剧透 下面的else if和公平锁没有区别
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
           throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如果插队失败,当前线程就会成为遵纪守法好公民,尝试进入队列并开始排队

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 最最一开始tail没有初始化 这里是null
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 尝试排在队列末尾,也是用CAS的方式,是原子操作,线程安全的
        if (compareAndSetTail(pred, node)) {
            // 入队成功,返回
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 用死循环保证入队成功
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
               // 初始化,可以看到tail和head指向是一致的
               tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 前一个节点是head,自己是队首,才能尝试去获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果不是就会根据前一个节点的状态,来决定是否等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 等待
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
        }
    }
}

公平锁

final void lock() {
    acquire(1);
}

直接看公平锁的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 这个hasQueuedPredecessors就是公平和非公平最大的区别了
        // 返回false才有资格去获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

关于Node的状态

Node定义了5种状态
如果在代码里看到waitStatus>0的条件,就是判断当前Node是不是取消了

static final class Node {
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    // 初始化是0
    volatile int waitStatus;
}

再回头看刚刚的acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            ...
        }
    } finally {
        if (failed)
            // 只要在try代码块中有抛出异常,就会进入cancel的流程
            cancelAcquire(node);
        }
    }
}
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    // 和自己的线程解约
    node.thread = null;
    Node pred = node.prev;
    // 一直循环找到状态不是cancel的节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    // 将当前节点置为cancel
    node.waitStatus = Node.CANCELLED;

    // 如果自己就是队尾节点,把上一个节点设置成tail
    if (node == tail && compareAndSetTail(node, pred)) {
        // 设置成功后把上一个节点的next设置为空
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 介绍自己的下家给上家认识,自己深藏功与名
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 自己是队头就走unlock的流程
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

unlock

解锁操作是不区分公平和非公平的

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 只有c等于0了,才是释放锁
        // 这里也牵涉到ReentrantLock是可重入锁,是可以反复上锁的
        // 所以对应的就要反复解锁,state这个int字段,就可以理解为上锁的次数
        free = true;
        // 把AQS中的独占线程清空
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

tryLock

上面看懂后,没有参数的tryLock就特别简单,因为就是非公平锁的实现,并且如果没有获取成功,也不会入队

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

带参数的tryLock

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 死循环直到获取锁或者超时
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超时
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 用这个LockSupport等待指定的时间
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 因为failed默认是true,所以不管是超时还是抛异常,最终都会走到cancel流程
            cancelAcquire(node);
    }
}

高级篇

相关文章

  • ReentrantLock

    ReentrantLock的lock和unlock源码解析

  • ReentrantLock重入锁和 AQS同步器源码解析

    ReentrantLock重入锁和 AQS同步器源码解析 AQS就是AbstractQueuedSynchroni...

  • ReentrantLock源码解析

    前言 心血来潮想到要做一些源码分析,想想从ReentrantLock类开始挺合适的,就尝试着先写一篇吧,废话不多说...

  • ReentrantLock源码解析

    简介 ReentrantLock是一个可重入的独享锁,是平时常用的一个锁,用法和实现都比较简单,如下: 还有Ree...

  • ReentrantLock源码解析

    看这部分的前提是大家已经看过AbstractQueuedSynchronizer这个类,知道它是个啥了哈,如果不知...

  • ReentrantLock源码解析

    1. 简介 ReentrantLock与synchronized关键字一样是可重入的独占锁,不过Reentrant...

  • ReentrantLock源码解析

    ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表...

  • ReentrantLock源码解析

    ReentrantLock编码示例 ①new ReentrantLock() 无参数构造函数初始化一个非公平锁。关...

  • ReentrantLock源码解析

    首先来看ReentrantLock的公平锁实现源码 第一步便是判断锁是不是自由状态,如果是则判断直接是否需要排队(...

  • ReentrantLock源码解析

    要理解ReentrantLock,首先要理解AbstractQueuedSynchronizer。Abstract...

网友评论

    本文标题:ReentrantLock源码解析

    本文链接:https://www.haomeiwen.com/subject/cdpkphtx.html