第十三章 显示锁
与内置锁相比,显式的Lock提供了一些扩展功能,在处理锁的不可用性方面有着更高的灵活性,并且对队列有着更好的控制。
但ReentrantLock不能完全替代synchronized,只有在synchronized无法满足需求时,才应该使用。
13.1 Lock与ReentraLock
13-1给出的Lock接口定义了一组抽象的加锁操作。与内置加锁机制不同的是,Lock提供了一种无条件的,可轮询的,定时的以及可中断的锁获取操作。
所有加锁和解锁方法都是显式的。在Lock的实现中必须提供与内部锁相同的内存可见性语义,但在加锁语义,调度算法,顺序保证以及性能特性等方面可以有所不同。
程序清单13-1
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException;
void unlock();
Condition newCondition();
}
ReentrantLock实现了Lock接口,并提供了与synchronized相同的互斥性和内存可见性。
在获取ReentrantLock时,有着与进入同步代码块相同的内存语义,在释放ReentrantLock时,同样有着与退出同步代码块相同的内存语义。
此外,与synchronized一样,ReentrantLock还提供了可重入的加锁语义(2.3.2节)。
ReentrantLock支持在Lock接口中定义的所有获取锁模式,并且与synchronized相比,它还为处理锁的不可用性问题提供了更高的灵活性。
在多数情况下,内置锁都能很好地工作,但在功能上存在一些局限性,例如,无法中断一个正在等待获取锁的线程,或者无法在请求获取一个锁时无限地等待下去。内置锁必须在获取该所的代码块中释放,这就简化了编码工作,并且与异常处理操作实现了很好的交互,但却无法实现非阻塞结构的加锁规则。
13-2给出了Lock接口的标准使用形式,这种形式比内置锁复杂一些:必须在finally块中释放锁。否则,如果在被保护的代码中抛出了异常,那么这个锁将永远无法释放。
当使用加锁时,还必须考虑在try中抛出异常的情况,如果可能使对象处于某种不一致的状态,那么就需要更多的try-catch或try-finally代码块。
程序清单13-2
Lock lock = new ReentrantLock();
...
lock.lock();
try {
// 更新对象状态
//捕获异常,并在必要时恢复不变性条件
} finally {
lock.unlock();
}
13.1.1 轮询锁与定时锁
可定时的与可轮询的锁获取模式是由tryLock方法实现的,与无条件的锁获取模式相比,它具有更完善的错误恢复机制。
在内置锁(intrinsic lock)中,死锁时一个严重的问题,恢复程序的唯一方法时重新启动程序,而防止死锁的唯一方法是在构造程序时避免出现不一致的锁顺序。
可定时和可轮询的锁提供了另一种选择:避免死锁发生。
如果不能获得所有需要的锁,那么可以使用可定时的或可轮询的锁获取方式,从而使你重新获得控制权,它会释放已经获得锁,然后重新尝试获取所有锁(或者至少将这个失败记录到日志,并采取其他措施)。
13-3中给出了另一种方法来解决10.1.2中动态顺序死锁的问题:使用tryLock来获取两个锁,如果不能同时获得,那么就回退并重新尝试。
在休眠时间中包含固定部分和随机部分,从而降低发生活锁的可能性。
如果在指定时间内不能获得所有需要的锁,那么transferMoney将返回一个失败状态,从而使该操作平缓地失败。
程序清单13-3
public class DeadlockAvoidance {
private static Random rnd = new Random();
public boolean transferMoney(Account fromAcct,
Account toAcct,
DollarAmount amount,
long timeout,
TimeUnit unit)
throws InsufficientFundsException, InterruptedException {
long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
long randMod = getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return false;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
private static final int DELAY_FIXED = 1;
private static final int DELAY_RANDOM = 2;
static long getFixedDelayComponentNanos(long timeout, TimeUnit unit) {
return DELAY_FIXED;
}
static long getRandomDelayModulusNanos(long timeout, TimeUnit unit) {
return DELAY_RANDOM;
}
static class DollarAmount implements Comparable<DollarAmount> {
public int compareTo(DollarAmount other) {
return 0;
}
DollarAmount(int dollars) {
}
}
class Account {
public Lock lock;
void debit(DollarAmount d) {
}
void credit(DollarAmount d) {
}
DollarAmount getBalance() {
return null;
}
}
class InsufficientFundsException extends Exception {
}
}
在实现具有时间限制的操作时,定时锁非常有用(6.3.7节)。
当在带有时间限制的操作中调用了一个阻塞方法时,它能根据剩余时间来提供一个时限。如果操作不能在指定时间内给出结果,那么程序就会提前结束。
当使用内置锁时,在开始请求锁后,这个操作将无法取消,因此内置锁很难时限带有时间限制的操作。
13-4尝试在Lock保护的共享通信线路上发送一条消息, 如果不能在指定时间内完成,代码就会失败。
定时的tryLock能够在这个带有时间限制的操作中实现独占加锁行为。
程序清单13-4
public class TimedLocking {
private Lock lock = new ReentrantLock();
public boolean trySendOnSharedLine(String message,
long timeout, TimeUnit unit)
throws InterruptedException {
long nanosToLock = unit.toNanos(timeout)
- estimatedNanosToSend(message);
if (!lock.tryLock(nanosToLock, NANOSECONDS))
return false;
try {
return sendOnSharedLine(message);
} finally {
lock.unlock();
}
}
private boolean sendOnSharedLine(String message) {
/* send something */
return true;
}
long estimatedNanosToSend(String message) {
return message.length();
}
}
13.1.2 可中断的锁获取操作
可中断的锁获取操作能在可取消的操作中使用加锁。
7.1.6中给出了几种不能响应中断的机制,例如请求内置锁。这些不可中断的阻塞机制将使的实现可取消的任务变得复杂。
lockInterruptibly方法能够在获得锁的同时保持对中断的响应,并且由于它包含在Lock中,因此无需创建其他类型的不可中断阻塞机制。
可中断的锁获取操作的标准结构比普通的锁获取操作略复杂一些,因为需要两个try块(如果在可中断的锁获取操作中抛出了InterruptedException,那么可以使用标准的try-finally加锁模式)。
13-5中使用了lockInterruptibly来实现13-4中的sendOnSharedLine,以便在一个可取消的任务中调用它。
定时的tryLock同样能响应中断,因此当需要一个定时的和可中断的锁获取操作时,可以使用tryLock方法。
程序清单13-5
public class InterruptibleLocking {
private Lock lock = new ReentrantLock();
public boolean sendOnSharedLine(String message)
throws InterruptedException {
lock.lockInterruptibly();
try {
return cancellableSendOnSharedLine(message);
} finally {
lock.unlock();
}
}
private boolean cancellableSendOnSharedLine(String message) throws InterruptedException {
/* send something */
return true;
}
}
13.1.3 非块结构的加锁
在内置锁中,锁的获取和释放等操作都是基于代码块的——释放锁的操作总是与获取锁的操作处于同一个代码块,而不考虑控制权如何退出该代码块。
11章中,通过降低锁的粒度提高了代码的可伸缩性。锁分段技术在基于散列的容器中实现了不同的散列链,以便使用不同的锁。
我们可以采用类似原则来降低链表中锁的粒度,为每个链表节点使用一个独立的锁,使不同的线程能独立地对链表的不同部分进行操作。每个节点的锁将保护连接指针以及在该节点中存储的数据,因此当遍历或修改链表时,我们必须持有该节点上的这个锁,直到获得了下一个节点的锁,这样我们才能释放上一个节点的锁。
13.3 公平性
在ReentrantLock的构造函数中提供了两种公平性选择:创建一个非公平的锁(默认)或者一个公平的锁。
在公平的锁上,线程将按照它们发出请求的顺序来获得锁,但在非公平的锁上,则允许“插队”:当一个线程请求非公平的锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有的等待线程并获得这个锁(在Semaphore中同样可以选择采用公平或非公平的获取顺序)。
在激烈竞争的情况下,非公平锁的性能高于公平锁,其中的一个原因时:在恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。
假设线程A持有一个锁,并且线程B请求这个锁。由于A持有这个锁,因此B挂起。当A释放锁时,B将被唤醒,因此会再次尝试获取锁。此时,如果C也请求这个锁,那么C很可能在B被完全唤醒之前获得,使用及释放这个锁。
这是一种“双赢”的局面:B获得锁的时刻并没有推迟,C更早地获得了锁,并且吞吐量也获得了提高。
当持有锁的时间相对较长,或者请求锁的平均时间间隔较长,那么应该使用公平锁。
13.4 在synchronized与ReentrantLock之间进行选择
在一些内置锁无法满足需求的情况下,ReentrantLock可以作为一种高级工具。当需要一些高级功能时才应该使用ReentrantLock,这些功能包括:可定时的,可轮询的与可中断的锁获取操作,公平队列,以及非块结构的锁。否则,还是应该优先使用synchronized。
13.5 读-写锁
ReentrantLock实现了一种标准的互斥锁:每次最多只有一个线程能持有ReentrantLock。
大多数访问操作都是“读操作”,如果能够放宽加锁需求,允许多个执行读操作的线程同时访问数据结构,那么将提升程序的性能。只要每个线程都能确保读取到最新的数据,并且在读取数据时不会有其他线程修改数据,那么就不会发生问题。
在这种情况下就可以使用读/写锁:一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。
13-6中的ReadWriteLock暴露了两个Lock对象,其中一个用于读操作,而另一个用于写操作。要读取有ReadWriteLock保护的数据,必须先获得读取锁,当需要修改ReadWriteLock保护的数据,必须先获得写入锁。
程序清单13-6
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
读—写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发性(在多处理器系统上被频繁读取的数据结构)。
在读取锁和写入锁之前的交互可以采用多种实现方式。ReadWriteLock中的一些可选实现包括:
- 释放优先
当一个写入操作释放写入锁时,并且队列中同时存在读线程和写线程,那么应该优先选择读线程,写线程,还是最先发出请求的线程? - 读线程插队
如果锁是由读线程持有,但有写线程正在等待,那么新达到的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但却可能造成写线程发生饥饿问题。 - 重入性
- 降级
如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读取锁?这可能会使得写入锁被降级为读取锁,同时不允许其他写线程修改被保护的资源, - 升级
读取锁能够优先于其他正在等待的读线程和写线程而升级为一个写入锁?
ReentrantReadWriteLock为这两种锁都提供了可重入的加锁语义。ReentrantReadWriteLock在构造时也可以选择时一个非公平的锁(默认)还是一个公平的锁。
ReentrantReadWriteLock中的写入锁只能有唯一的拥有者,并且只能由获得该锁的线程来释放。
13-7的TeadWriteMap中使用了ReentrantReadWriteLock来包装Map,从而使它能在多个读线程直接被安全地共享,并且仍然避免“读—写”或“写—写”冲突。
ConcurrentHashMap的性能已经很好了,如果只需要一个并发的基于散列的映射,就可以使用ConcurrentHashMap来代替这种方法,但如果需要对另一种Map实现(例如LinkedHashMap)提供并发性更高的访问,可以使用这种技术。
程序清单13-7
public class ReadWriteMap <K,V> {
private final Map<K, V> map;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock r = lock.readLock();
private final Lock w = lock.writeLock();
public ReadWriteMap(Map<K, V> map) {
this.map = map;
}
public V put(K key, V value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public V remove(Object key) {
w.lock();
try {
return map.remove(key);
} finally {
w.unlock();
}
}
public void putAll(Map<? extends K, ? extends V> m) {
w.lock();
try {
map.putAll(m);
} finally {
w.unlock();
}
}
public void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
public V get(Object key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public int size() {
r.lock();
try {
return map.size();
} finally {
r.unlock();
}
}
public boolean isEmpty() {
r.lock();
try {
return map.isEmpty();
} finally {
r.unlock();
}
}
public boolean containsKey(Object key) {
r.lock();
try {
return map.containsKey(key);
} finally {
r.unlock();
}
}
public boolean containsValue(Object value) {
r.lock();
try {
return map.containsValue(value);
} finally {
r.unlock();
}
}
}
网友评论