5.1 同步容器类
同步容器类包括 Vector
和 Hashtable
,二者是早期 JDK 的一部分,此外还包括在 JDK 1.2 中添加的一些功能相似的类,这些同步的封装器类是由 Collections.synchronizedXxx
等工厂方法去创建的:
这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
5.1.1 同步容器类的问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。如下程序清单 5-1:
// 程序清单 5-1
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
// 调用 get 方法时,vector 可能已经被下面的 deleteLast 更改了
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndext = list.size() - 1;
list.remove(lastIndext);
}
上面代码出错的根本原因是因为代码中的方法不是原子操作。我们可以通过获得容器类的锁,来使 getLast
和 deleteLast
成为原子操作:
// 程序清单 5-2
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndext = list.size() - 1;
list.remove(lastIndext);
}
}
5.2 并发容器
同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发行,当多个线程竞争容器的锁时,吞吐量将严重减低。
另一方面,并发容器是针对多个线程并发访问设计的。在 Java 5.0 中增加了 ConcurrentHashMap
,用来替代同步且基于散列的 Map
,以及 CopyOnWriteArrayList
,用于在遍历操作为主要操作的情况下代替同步的 List
。Java 5.0 增加了两种新的容器类型:Queue
和 BlockingQueue
。
Queue
用来临时保存一组等待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue
,这是一个传统的先进先出队列,以及 PriorityQueue
,这是一个(非并发的)优先队列。
BlockingQueue
扩展了 Queue
,增加了可阻塞的插入和获取等操作。
5.2.1 ConcurrentHashMap
ConcurrentHashMap
并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器。
事实上,ConcurrentHashMap
在 Java 1.7 和 Java 1.8 中的实现有一点是不一样的。
在 Java 1.7 中的实现是该书中所介绍的,使用分段锁机制来在粒度更细的层次进行加锁。我们去看 Java 1.7 中的源码实现就会发现,它的分段锁机制由一个 Segment
的数据结构来实现,Segment
自身继承自可重入锁 ReentrantLock
。而且 HashEntry
中的 value
是被 volatile
修饰的,因此它支持任意数量的读取线程并发地访问 Map
,执行读取操作的线程和执行写入操作的线程可以并发地访问 Map
,由 volatile
来保证可见性。
并且它还支持一定数量的写入线程可以并发地修改 Map
,并发数量由 Segment
的数量来决定。这是因为 ConcurrentHashMap
采用 Segment
的分段锁机制。但一个写入线程在一个 Segment
上操作时,因为它只持有这个 Segment
的锁,所以其他的写入线程依然可以进入另外的 Segment
进行操作。
在 Java 1.8 中,ConcurrentHashMap
的实现使用了更进一步的优化。在数据结构上放弃了分段锁的实现,而是采用了根普通 HashMap
差不多的结构,只不过在多线程更新数据结构时采用了 CAS机制 和 synchronized
加锁。
ConcurrentHashMap
与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出 ConcurrentModificationException
,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap
返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
5.2.3 CopyOnWriteArrayList
CopyOnWriteArrayList
用于替代同步 List
,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器返回的迭代器不会抛出 ConcurrentModificationException
,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。
5.3 阻塞队列和生产者 - 消费者模式
一种常见的生产者 - 消费者设计模式就是线程池与工作队列的组合,在 Executor
任务执行框架中就体现了这种模式。
在类库中包含了 BlockingQueue
的多种实现,其中,LinkedBlockingQueue
和 ArrayBlockingQueue
是 FIFO 队列,二者分别与 LinkedList
和 ArrayList
类似,但比同步 List
拥有更好的并发性能。PriorityBlockingQueue
是一个按优先级排列的队列,当你希望按照某种顺序而不是 FIFO 来处理元素时,这个队列将非常有用。
最后一个 BlockingQueue
实现是 SynchronousQueue
,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移除队列。
5.3.3 双端队列和工作密取
Java 6 增加了两种容器类型,Deque
(发音为 "deck",是 Double Enabled Queue 的缩写)和 BlockingDeque
,它们分别对 Queue
和 BlockingQueue
进行了扩展。Deque
是一个双端队列,实现了在队列头和对列尾的高效插入和移除。具体实现包括 ArrayDeque
和 LinkedBlockingDeque
。
正如阻塞队列适用于生产者 - 消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing)。在生产者 - 消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。密取工作模式比传统的生产者 - 消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。
在 Java 7 和 Java 8 中,都原生的实现了拥有工作密取机制的线程池,分别是 ForkJoinPool
和 newWorkStealingPool
类。
5.4 阻塞方法与中断方法
Java 中 InterruptedException 的最佳实践
5.5 同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调现成的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
5.5.1 闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。当闭锁到达结束状态后,将不会再改变状态,闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。
CountDownLatch
是一种灵活的闭所实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown
方法递减计数器,表示有一个事件已经发生了,而 await
方法等待计数器达到零,表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await
会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
// 程序清单 5-11
static class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
我们使用 startGate
闭锁来达到所有子线程都调用了 start
方法启动后,主线程才开始计时的功能。同时我们使用 endGate
闭锁来达到最后一个子线程执行完时,主线程才结束计时的功能。
5.5.2 FutureTask
FutureTask
也可以用作闭锁。FutureTask
表示的计算是通过 Callable
来实现的,相当于一种可生成结果的 Runnable
,并且可以处于以下 3 种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当 FutureTask
进入完成状态后,它会永远停止在这个状态上。
Future.get
的行为取决于任务的状态。如果任务已经完成,那么 get
会立即返回结果,否则 get
将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask
将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask
的规范确保了这种传递过程能实现结果的安全发布。
static class Preloader {
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
@Override
public ProductInfo call() throws Exception {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get(); // 可能会阻塞地等待结果
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw launderThrowable(cause);
}
}
}
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new IllegalStateException("Not unchecked", t);
}
}
}
无论任务代码抛出什么异常,都会被封装到一个 ExecutionException
中,并在 Future.get
中被重新抛出。这将使调用 get
的代码变得复杂,因为它不仅需要处理可能出现的 ExecutionException
(以及未检查的 CancellationException
),而且还由于 ExecutionException
是作为一个 Throwable
类返回的,因此处理起来并不容易。我们使用 launderThrowable
方法来处理 get
抛出的各种情况的异常。
5.5.3 信号量
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore
中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire
将阻塞直到有许可(或者直到被中断或者操作超时)。release
操作将返回一个许可给信号量。而且 Semaphore
并不受限于它在创建时的初始许可数量。
计算信号量的一种简化形式是二值信号量,即初始值为 1 的 Semaphore
。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,就拥有了互斥锁。
Semaphore
可以用于构造阻塞对象池:如果将 Semaphore
的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire
方法获取一个许可,在将资源返回给池之后调用 release
释放许可,那么 acquire
将一直阻塞直到资源不为空。(但是,实现阻塞对象池时,一种更简单的方法是使用 BlockingQueue
来保存池的资源。)
同样,你也可以使用 Semaphore
将任何一种容器变成有界阻塞容器。
// 程序清单 5-14
static class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}
信号量 sem
的计数值会初始化为容器容量的最大值。add
操作在向底层容器中添加一个元素之前,首先要获取一个许可。如果 add
操作没有添加任何元素,那么会立刻释放许可。同样,remove
操作释放一个许可,使更多的元素能够添加到容器中。底层的 Set
实现并不知道关于边界的任何信息,这是由 BoundedHashSet
来处理的。
5.5.4 栅栏
这个在应用中比较少见,先暂时不看。
5.6 构建高效且可伸缩的结果缓存
本节我们将开发一个高效且可伸缩的缓存,用于改进一个高计算开销的函数。我们首先从简单的 HashMap
开始,然后分析它的并发性缺陷,并讨论如何修复它们。
// 程序清单 5-16
public interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
// 在经过长时间的计算后
return new BigInteger(arg);
}
}
public class Memoizer1<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
// 通过 synchronized 关键字来进行同步
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
上面的程序逻辑很简单,使用 HashMap
来保存之前计算的结果。compute
方法将首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算的值,否则,将把计算结果缓存在 HashMap
中,然后再返回。
HashMap
不是线程安全的,因此要确保两个线程不会同时访问 HashMap
,Memoizer1
使用了 synchronize
关键字对 compute
方法进行同步。这种方法不会有线程安全性问题,但是会有一个明显的可伸缩性问题:每次只有一个线程能够执行 compute
。这显然不是我们想要的。
// 程序清单 5-17
public class Memoizer2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Memoizer2(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
上面 Memoizer2
用 ConcurrentHashMap
代替 HashMap
来改进 Memoizer
中糟糕的并发行为。由于 ConcurrentHashMap
是线程安全的,因此在访问底层 Map
时就不需要进行同步,因而避免了在对 Memoizer1
中的 compute
方法进行同步时带来的串行性。
但是 Memoizer2
仍然存在一个问题,当两个线程同时调用 compute
时,可能会导致计算得到相同值,这是因为一个线程不知道另一个线程也在做相同的计算。因此我们的一个解决方案是,一个线程在进行计算前,需要有能力去查看是否有其他线程正在进行同样的计算?
我们已经知道有一个类能基本实现这个功能:FutureTask
。FutureTask
表示一个计算的过程,这个过程可能已经计算完成,也可能正在进行。如果有结果可用,那么 FutureTask.get
将立即返回结果,否则它会一直阻塞,直到结果计算出来再将其返回。
// 程序清单 5-18
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
// 这里的 get 与下面的 put 不是原子操作,因此同样存在短暂的时间,两个线程可能会同时进入下面的 if 语句
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = () -> c.compute(arg); // 将计算包装成一个 Callable
FutureTask<V> ft = new FutureTask<>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); // 开始执行计算
}
try {
return f.get(); // 阻塞等待计算结果
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
上面的 Memoizer3
几乎是完美的:若结果已经计算出来,那么将立即返回。如果其他线程正在计算该结果,那么新到的线程将一直等待这个结果被计算出来。它只有一个缺陷:正如代码中注释所说的,compute
方法中的 if
代码块仍然是非原子(nonatomic)的 “先检查再执行” 操作,因此两个线程仍有可能在同一时间内调用 compute
来计算相同的值,即二者都没有在缓存中找到期望的值,因此都开始计算。
// 程序清单 5-19
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = () -> c.compute(arg);
FutureTask<V> ft = new FutureTask<>(eval);
f = cache.putIfAbsent(arg, ft); // 使用 putIfAbsent 原子方法来避免两个线程同时执行下面的 run 方法
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
}
上面的 Memoizer
使用了 ConcurrentMap
中的原子方法 putIfAbsent
,避免了 Memozier3
的漏洞。当缓存的是 Future
而不是值时,将导致缓存污染(Cache Pollution)问题:如果某个计算被取消或失败,那么将把 Future
从缓存中移除,这样将来的计算才可能成功。
在 Memoizer
的代码中,实际上永远不会抛出 CancellationException
,但是由于无法保证使用者在 Callable
的实现中会执行怎么样的操作(包括取消操作),因此考虑 catch
住这个异常并没有任何问题。
Memoizer
同样没有解决缓存逾期的问题,
网友评论