1.定义
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时 访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。同时也保证了资源的可见性。
2.Lock使用
2.1方法
Lock是一个接口,它定义了锁获取和释放的基本操作
基本使用方法:
Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
lock.unlock(); }
方法名 | 描述 |
---|---|
void lock() | 获取锁,当前线程获取锁,获取后从方法返回 |
void lockInterruptibly() throws InterruptException | 获取锁中可响应中断 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立即返回 |
boolean tryLock(long time,TimeUnit unit) throws InterruptException | 可响应超时 |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前锁绑定,当前线程只有获取锁之后,才能调用该组件的wait()方法,调用后释放锁 |
2.2重入锁ReentrantLock(重入vs非重入)
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对
资源的重复加锁。ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方 法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
package lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockMain {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread thread = new Thread(new ReentrantLockDomino());
thread.start();
}
static class ReentrantLockDomino implements Runnable {
@Override
public void run() {
lock.lock();
try {
System.out.println("ReentrantLockDomino run .........");
execute();
} finally {
lock.unlock();
}
}
private void execute() {
lock.lock();
try {
System.out.println("execute run .........");
} finally {
lock.unlock();
}
}
}
}
2.3读写锁ReentrantReadWriteLock(共享vs排他)
读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。
接下来,通过一个缓存示例说明读写锁的使用方式:
package lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWriteLockMain {
private static Map<String, Object> map = new HashMap<>();
private static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private static Lock r = rwl.readLock();
private static Lock w = rwl.writeLock();
// 获取一个key对应的value
public static Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static void clear() {
w.lock();
try {
map.clear();
} finally {
w.lock();
}
}
}
2.4公平锁(公平锁vs非公平锁)
如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线 程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁 是否是公平的。
事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为 唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3.AbstractQueuedSynchronized 队列同步器(AQS)
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状 态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3 个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部 类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来 供自定义同步组件使用。
3.1方法
同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的 方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些 模板方法将会调用使用者重写的方法。
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
1.getState():获取当前同步状态。
2.setState(int newState):设置当前同步状态。
3.compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性
同步器可重写的方法:
image.png
实现自定义同步组件时,将会调用同步器提供的模板方法,如下:
image.png
3.2队列同步器的实现
接下来将从实现角度分析同步器是如何完成线程同步的,主要包括:同步队列、独占式同 步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据 结构与模板方法。
3.2.1同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取 同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其 加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再 次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和 后继节点,节点的属性类型与名称以及描述如表5-5所示
image.png
节点是构成同步队列(等待队列,同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的 基本结构如图5-1所示。
image.png
同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。 试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转 而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此
同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式 与之前的尾节点建立关联。
[图片上传中...(image.png-65b3ef-1578465786330-0)]
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态 时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程 如图5-3所示。
image.png
3.2.2独占式同步状态的获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感。代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
该操作分为3步:
1.首先调用自定义同步器实现的tryAcquire(int arg)方法,进行同步状态获取,如果获取失败,则进行第二步。
2.构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
3.最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
而只有前驱节点是头节点才能够尝试获取同步状态,否则的话将通过parkAndCheckInterrupt进入阻塞状态。原因如下:
第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会 唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
第二,维护同步队列的FIFO原则。
image.png
由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自 己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查 的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释 放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程 由于中断而被唤醒)。
image.png
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能 够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释 放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用 LockSupport(在后面的章节会专门介绍)来唤醒处于等待状态的线程。
3.2.3共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状 态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操 作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况
image.png
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态:
public final void acquireShared(int arg) {
//1.自定义实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.tryAcquireShared(int arg)>0获取同步状态成功(区别于独占锁,是直接返回boolean),否则进入第二步。
2.调用acquireShared(int arg),共享式获取的自旋。只有在前驱节点是head的情况下才会调用tryAcquireShared(int arg)尝试获取共享状态,否则阻塞当前线程。
与独占式一样,共享式获取也需要释放同步状态,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线 程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为 释放同步状态的操作会同时来自多个线程。
3.2.4独占式超时状态的获取
待补充
3.3重入锁的原理
待补充
3.4公平锁的原理
待补充
3.5读写锁的原理
待补充
4. LockSupport
concurrent包是基于AQS (AbstractQueuedSynchronizer)框架的,AQS框架借助于两个类:
Unsafe(提供CAS操作)
LockSupport(提供park/unpark操作)
方法如下:
image.png
源码:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
5. Condition
5.1方法
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创 建出来的,换句话说,Condition是依赖Lock对象的。
Condition的使用方式比较简单,需要注意在调用方法前获取锁,如下:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock(); try {
condition.await(); } finally {
lock.unlock();
} }
public void conditionSignal() throws InterruptedException { lock.lock();
try {
condition.signal(); } finally {
lock.unlock();
} }
如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会 释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程 才从await()方法返回,并且在返回前已经获取了锁。
方法如下:
image.png
5.2使用
获取一个Condition必须通过Lock的newCondition()方法。下面通过一个有界队列的示例来 深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作 将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线 程,直到队列出现“空位”,代码如下:
package lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 有界队列:FIFO
*/
public class BoundQueue<T> {
private Object[] items;
private int addIndex;
private int removeIndex;
private int count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundQueue(int size) {
this.items = new Object[size];
}
//添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (items.length == count) {
notFull.await();
}
items[addIndex++] = t;
if (addIndex == count) {
addIndex = 0;
}
count++;
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
//由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
public Object remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object object = items[removeIndex++];
if (removeIndex == items.length) {
removeIndex = 0;
}
count--;
notFull.signalAll();
return object;
} finally {
lock.unlock();
}
}
}
5.3原理
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要 获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队 列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
5.3.1等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是 在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会 释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点 的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点 (lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部 加入等待队列,
image.png
如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter 指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用 await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列
image.png
5.3.2等待
调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释 放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相 关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同 步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
Condition的await()方法,如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//1.创建节点,当前线程加入等待队列
Node node = addConditionWaiter();
//2.释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前 线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当 前线程会进入等待状态。
image.png
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过 其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException。
5.3.3通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在 唤醒节点之前,会将节点移到同步队列中。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); Node first = firstWaiter;
if (first != null) doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了 isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。 节点从等待队列移动到同步队列的过程如图5-12所示。
image.png
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队
列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法 返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状 态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此 时该线程已经成功地获取了锁。
Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效 果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
网友评论