以下分析均基于jdk1.8
AQS 是一个用于实现阻塞锁和相关同步器的框架,它提供了一些基本的原子操作(如 CAS,自旋等待)以及一个等待队列来协调多个线程之间的互斥和共享访问。ReentrantLock 实现了 AQS 的 tryAcquire
和 release 方法来获取和释放锁,以及 Condition 来支持锁的条件等待。
ReentrantLock 在基于 AQS 实现的同时,也可以通过重入锁的方式实现线程的可重入性,使得同一个线程可以多次获取同一个锁而不会死锁。此外,ReentrantLock 还提供了公平锁和非公平锁两种模式,以适应不同的应用场景。
AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码是 AQS(AbstractQueuedSynchronizer)类中的 acquire(int arg) 方法实现。该方法用于获取独占式锁(exclusive lock)。以下是方法的含义和执行流程:
- tryAcquire(arg) 尝试直接获取独占锁,如果成功返回 true,否则执行第二步。
- addWaiter(Node.EXCLUSIVE) 创建一个独占节点 Node.EXCLUSIVE,将其加入到等待队列中,并返回该节点。
- acquireQueued(node, arg) 将节点 node 加入到等待队列中,并阻塞线程,直到获取独占锁成功为止。
- selfInterrupt() 如果线程在等待过程中被中断,调用 selfInterrupt() 方法将线程的中断状态重新设置。
因此,该方法的作用是获取独占式锁,并在获取不到锁时将线程加入到等待队列中,直到锁被释放或者线程被中断才返回。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循环执行步骤 1 到步骤 4,直到成功获取到锁或者线程被中断。
for (;;) {
// 1. 返回等待队列中当前节点的前一个节点。
final Node p = node.predecessor();
// 2. 如果前一个节点是 head(即等待队列的首节点)并且能够通过 tryAcquire(arg) 方法获取到锁,就将当前节点设为新的 head,将前一个节点的 next 引用设置为 null(以便垃圾回收),然后返回中断状态。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 3. 如果无法获取到锁,调用 shouldParkAfterFailedAcquire(p, node) 判断当前线程是否应该挂起等待。
// 如果当前线程应该挂起等待,则调用 parkAndCheckInterrupt() 方法将线程挂起并检查中断状态,如果线程在等待期间被中断,将中断状态设置为 true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待过程中获取锁失败,调用 cancelAcquire(node) 方法取消该节点的等待状态,并从等待队列中移除。
if (failed)
cancelAcquire(node);
}
}
这段代码是 AQS(AbstractQueuedSynchronizer)类中的 acquireQueued(Node node, int arg) 方法实现。该方法用于将节点加入到等待队列中,并且在队列中自旋等待直到获取锁为止。以下是该方法的主要步骤:
- node.predecessor() 返回等待队列中当前节点的前一个节点。
- 如果前一个节点是 head(即等待队列的首节点)并且能够通过 tryAcquire(arg) 方法获取到锁,就将当前节点设为新的 head,将前一个节点的 next 引用设置为 null(以便垃圾回收),然后返回中断状态。
- 如果无法获取到锁,调用 shouldParkAfterFailedAcquire(p, node) 判断当前线程是否应该挂起等待。
- 如果当前线程应该挂起等待,则调用 parkAndCheckInterrupt() 方法将线程挂起并检查中断状态,如果线程在等待期间被中断,将中断状态设置为 true。
- 循环执行步骤 1 到步骤 4,直到成功获取到锁或者线程被中断。
- 如果等待过程中获取锁失败,调用 cancelAcquire(node) 方法取消该节点的等待状态,并从等待队列中移除。
因此,acquireQueued(Node node, int arg) 方法实现了等待队列中的节点自旋等待获取锁,直到获取到锁或者线程被中断。在等待期间,会根据节点的前驱节点和锁状态来判断是否需要挂起等待,并且通过循环不断重试获取锁的操作。
公平锁
在 JDK 1.8 中,ReentrantLock
在公平模式下锁的抢占规则如下:
- 当线程尝试获取锁时,如果锁当前没有被其他线程持有,那么该线程将立即获得锁并成为锁的持有者。
- 当线程尝试获取锁时,如果锁当前被其他线程持有,那么该线程将进入等待队列,以 FIFO(先进先出)的顺序排队等待获取锁。
- 当锁的持有者释放锁时,如果等待队列中存在线程,则会选择队列中的第一个线程作为下一个持有者,并将其从等待队列中移除,然后唤醒该线程,使其尝试获取锁。
- 如果有多个线程在等待队列中等待获取锁,那么它们将按照先进先出的顺序进行竞争,直到其中一个线程成功获取锁为止。
在公平模式下,所有线程将按照其请求锁的顺序进行竞争,因此不会出现饥饿现象,也就是说,没有任何线程会无限期地等待获取锁。但是,由于需要维护等待队列,因此在高并发场景下,公平模式可能会导致性能下降。
lock
final void lock() {
acquire(1);
}
tryAcquire
protected final boolean tryAcquire(int acquires) {
// 1. 获取当前线程
final Thread current = Thread.currentThread();
// 2. 获取当前锁的状态值
int c = getState();
// 3. 判断锁是否可用
if (c == 0) {
// 3.1 如果当前锁的状态值为0,说明锁是可用的。如果没有排队的线程且能够通过CAS(compareAndSetState)操作将锁的状态值从0改为acquires,则获取锁成功,将当前线程设置为独占锁的线程并返回true。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 判断是否是独占锁的线程,如果当前线程是独占锁的线程,说明已经拥有了锁,此时通过增加锁的状态值来实现可重入,并返回true。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 5. 如果无法获取锁,则返回false。
return false;
}
hasQueuedPredecessors
其中对于hasQueuedPredecessors
方法,官方是这样描述的:
- 如果当前线程前面有一个排队的线程,则为
true
- 如果当前线程位于队列的头部或队列为空,则返回
false
getExclusiveOwnerThread
getExclusiveOwnerThread()是Java中AbstractOwnableSynchronizer(AOS)类的方法,用于获取当前锁的独占线程。该方法通常与可重入锁(ReentrantLock)结合使用。
在可重入锁中,同一个线程可以多次获取锁,每次获取锁时,都会将锁的状态值加1。如果当前线程已经获取了锁,则它就是锁的独占线程。getExclusiveOwnerThread()方法可以获取当前锁的独占线程,如果当前锁没有被线程占用,则返回null。
在tryAcquire()方法中,如果当前线程已经获取了锁,则可以直接对锁的状态值进行修改,而无需再次获取锁。因此,通过getExclusiveOwnerThread()方法来判断当前线程是否已经获取了锁。
总的来说,getExclusiveOwnerThread()方法的作用是获取当前锁的独占线程,可以用于判断当前线程是否已经获取了锁,从而进行相应的处理。
非公平锁
lock
final void lock() {
// 1. 尝试获取锁,如果成功,设置当前线程为独占线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 2. 否则调用AQS的acquire方法,子类实现nonfairTryAcquire方法
acquire(1);
}
nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 1. 首先,获取当前线程对象并获取当前锁的状态值。
final Thread current = Thread.currentThread();
int c = getState();
// 2. 如果当前锁的状态值为0,表示当前锁没有被其他线程占用,则当前线程可以直接获取锁。
if (c == 0) {
// 3. cas尝试获取锁,如果成功,设置当前线程为独占锁,并且返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 如果当前锁的状态值不为0,但是当前线程已经获取了锁,则可以直接对锁的状态值进行修改,而无需再次获取锁。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
condition
ReentrantLock是一个可重入的互斥锁,它提供了比内置锁更高级的同步功能。在使用ReentrantLock时,我们可以通过调用它的newCondition()方法创建一个Condition对象,来实现更加灵活的线程同步。
Condition是在Java 5中引入的一种新的线程同步机制,它提供了await()和signal()等方法,可以用于线程之间的通信和协调。
ReentrantLock的newCondition()方法可以创建一个与当前锁关联的Condition对象。调用该Condition对象的await()方法可以使当前线程等待,直到另一个线程调用该Condition对象的signal()方法或signalAll()方法唤醒它。
举个例子,假设我们有一个任务队列,多个线程需要从队列中获取任务并执行。我们可以使用ReentrantLock来实现对队列的同步,并且为每个线程分配一个Condition对象,以便在队列为空时等待任务的到来。具体实现可以参考下面的代码:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TaskQueue {
private Queue<String> queue = new LinkedList<>();
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
public void addTask(String task) {
lock.lock();
try {
queue.add(task);
notEmpty.signal(); // 通知等待的线程
} finally {
lock.unlock();
}
}
public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待任务的到来
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
在这个示例中,我们使用ReentrantLock和Condition实现了一个线程安全的任务队列。当任务队列为空时,调用getTask()方法的线程会等待,直到其他线程调用addTask()方法向队列中添加任务并通过notEmpty.signal()通知它们。
注意,在使用Condition对象时,一定要在调用await()方法前获得锁,并在finally块中释放锁。否则,如果线程在等待时被中断,它会持有锁而无法释放,导致其他线程无法获取锁。
网友评论