线程安全性
- 一个对象是否线程安全,取决于它是否被多个线程访问
- Java的同步机制关键字是synchronized,它提供了一种独占的加锁方式,但“同步”这个术语还包括volatile类型的变量,显式锁以及原子变量
- 由于不恰当的执行时序而出现不正确的结果这种情况成为:静态条件(race condition,这里的condition翻译成情况更合适)
- 大多数竞态条件的本质:基于一种可能失效的观察结果做出判断或者执行某个计算,这种类型的竞态条件称为“先检查后执行”,一个常见的案例就是延迟初始化,代码如下:
@NotThreadSafe
public class LazyInitRace {
private ExpensiveObject instance = null;
public ExpensiveObject getInstance() {
//检查条件,但是检查后条件可能发生变化
if (instance == null)
instance = new ExpensiveObject();
return instance;
}
}
class ExpensiveObject { }
- 复合操作必须是原子的:为了线程安全性,先检查后执行和“读取-修改-写入”等操作必须是原子的,这些操作也称为复合操作,上面的问题可以如下修复:
//线程安全的类
//当无状态的类添加一个状态时,如果该状态完全由线程安全的对象来管理,那么这个类仍然是线程的
@ThreadSafe
public class CountingFactorizer extends GenericServlet implements Servlet {
//使用AtomicLong来代替long类型的计数器
//AtomicLong是线程安全的对象
private final AtomicLong count = new AtomicLong(0);
public long getCount() { return count.get(); }
public void service(ServletRequest req, ServletResponse resp) {
...
count.incrementAndGet();
...
}
}
- 线程安全性的要求中,多个线程之间的操作无论是采用何种执行时序或者交替方式,都要保证不变性条件不被破坏。下面的代码虽然使用了Atomic类,但仍然是线程不安全的,因为不满足该要求:
//一个提供因式分解计算的Servlet,并带有缓存上次计算结果的缓存功能
@NotThreadSafe
public class UnsafeCachingFactorizer extends GenericServlet implements Servlet {
private final AtomicReference<BigInteger> lastNumber
= new AtomicReference<BigInteger>();
private final AtomicReference<BigInteger[]> lastFactors
= new AtomicReference<BigInteger[]>();
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
//判断是否缓存计算过
if (i.equals(lastNumber.get()))
//此处get的结果可能是判断过后又修改过的
encodeIntoResponse(resp, lastFactors.get());
else {
//计算
BigInteger[] factors = factor(i);
lastNumber.set(i);
lastFactors.set(factors);
encodeIntoResponse(resp, factors);
}
}
}
正确的写法:
@ThreadSafe
public class CachedFactorizer extends GenericServlet implements Servlet {
@GuardedBy("this") private BigInteger lastNumber;
@GuardedBy("this") private BigInteger[] lastFactors;
@GuardedBy("this") private long hits;
@GuardedBy("this") private long cacheHits;
public synchronized long getHits() {
return hits;
}
public synchronized double getCacheHitRatio() {
return (double) cacheHits / (double) hits;
}
//注意加锁的颗粒度
public void service(ServletRequest req, ServletResponse resp) {
BigInteger i = extractFromRequest(req);
BigInteger[] factors = null;
synchronized (this) {
++hits;
if (i.equals(lastNumber)) {
++cacheHits;
factors = lastFactors.clone();
}
}
if (factors == null) {
factors = factor(i);
synchronized (this) {
lastNumber = i;
lastFactors = factors.clone();
}
}
encodeIntoResponse(resp, factors);
}
}
- synchronized:Java提供了一种内置的锁的机制来支持原子性:同步代码块(用来修饰方法就是同步方法),每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或者监视器锁。内置锁是一种互斥锁,虽然通过它可以很简单的实现线程互斥性,但是会影响性能(活跃性问题或性能问题)
- 可冲入的概念:synchronized是可重入的,如果某个线程视图获取一个已经由它自己持有的锁,那么这个请求会成功。重入意味着获取锁的操作的粒度是线程,而不是调用。如果不支持重入,下面的代码中对父类的调用将会产生死锁:
class Widget {
public synchronized void doSomething() {
}
}
class LoggingWidget extends Widget {
public synchronized void doSomething() {
System.out.println(toString() + ": calling doSomething");
super.doSomething();
}
}
- 用锁来保护状态:对于可能被多个线程同时访问的可变状态变量,在访问它时都需要持有同一个锁,在这种情况下,我们称状态变量是由这个锁保护的(获取锁后只能避免其他线程获取同一个锁,不能阻止其他线程访问该对象)
- 每个共享的和可变的变量应该只由一个锁来保护,从而使维护人员知道是哪一个锁。同时,对于每个包含多个变量的不可变性条件,其中涉及的所有变量都需要由同一个锁来保护
对象的共享
- sychronized不仅仅能够实现原子性,它的另一个好处是实现可见性。即加锁的含义不仅仅局限于互斥行为,还包括内存可见性
- 在没有同步的情况下,编译器、处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整,要相对内存操作的顺序进行判断几乎不可能,这种调整也称为重排序
- 失效值:在没有同步的情况下,读取的数据有可能是要给失效值,这会给程序运行带来安全性问题或者活跃问题
- 最低安全性:读取的值可能是失效的,但最起码是之前设置过的值,这是最低有安全性,但是对于非volatile类型的long和double变量,JVM允许将64位的读操作或写操作分成两个32位的操作,这回导致在多线程中读到一个意料之外的值
- 加锁与可见性:加锁(内置所)保证一个线程能够以可预测的方式查看另一个线程的执行结果:例如在M上调用unlock之前的所有操作结果对于在M上调用lock之后的线程都是可见的
- volatile变量:volatile是一种稍弱的同步机制,确保变量的更新能够被其他线程所见,volatile会阻止JVM对该变量的操作和其他的内存操作一起重排序。需要注意的是,volatile只能保证可见性,而不能保证原子性,对于volatile执行++操作不是原子性的。因此当变量的写入依赖变量的当前值时,不能仅用volatile来同步
- 发布和逸出:发布一个对象是指让对象能够在作用域之外的代码中是使用,如果在对象构造完之前就发布该对象,就会破坏线程安全性,当某个不该发布的对象被发布时,这种情况就被称为逸出。下面代码通过内部类将this引用在构造函数中逸出(内部类隐含外部类的引用)
public class ThisEscape {
//不要在构造函数中使this引用逸出
public ThisEscape(EventSource source) {
source.registerListener(new EventListener() {
public void onEvent(Event e) {
doSomething(e);
}
});
}
void doSomething(Event e) {
}
interface EventSource {
void registerListener(EventListener e);
}
interface EventListener {
void onEvent(Event e);
}
interface Event {
}
}
- 线程封闭: 当访问共享的可变数据时,通常需要同步,一种避免使用同步的方式就是不共享数据,这种也称为线程封闭。栈封闭是线程封闭的一种特例,即只能通过局部变量才能访问对象
- ThreadLocal类:维持线程封闭性的一种更规范的方法是使用ThreadLocal,这个类能使线程中的某个值与保存的值的对象关联起来,比如下面的代码使每个线程都会拥有属于自己的连接:
public class ConnectionDispenser {
static String DB_URL = "jdbc:mysql://localhost/mydatabase";
private ThreadLocal<Connection> connectionHolder
= new ThreadLocal<Connection>() {
public Connection initialValue() {
try {
return DriverManager.getConnection(DB_URL);
} catch (SQLException e) {
throw new RuntimeException("Unable to acquire Connection, e");
}
};
};
public Connection getConnection() {
return connectionHolder.get();
}
}
- 不变性:满足同步需求的另一种方法就是使用不可变对象,原子性和可见性相关的问题的来源都是试图访问一个可变的状态相关。如果对象的状态不会改变,那么这些问题的复杂性也就自然消失了。不可变对象一定是线程安全的。不可变对象定义:对象创建以后其状态就不能修改,对象所有的域都是final类型,对象是正确创建的(没有this引用逸出)。它们只有一种状态,并且该状态由构造函数来控制。关键字final用于构造不可变对象。在java的内存模型中,final域还有着特殊的含义,final域能确保初始化过程的安全性,从而可以不受限制的访问不可变对象,并在共享这些对象时无需同步
- 对于访问和更新多个变量时出现的竞争条件问题,可以通过将这些变量全部保存在一个不可变对象中来消除
- 对象发布的坑:有时我们需要发布某个对象(作为public字段并供别人使用),这是要注意发布的正确性,下面是一个不正确发布的案例:
public class StuffIntoPublic {
//不安全的发布,其他线程看到Holder是不一致的,甚至是尚未创建完成的
public Holder holder;
public void initialize() {
holder = new Holder(42);
}
}
同时如果Holder的实现是如下所示,调用该类的assertSanity,有可能抛出异常,原因在于类未完全初始化(重排序导致),变量n还是默认值(一个失效值),如果n声明为final类型,那么Holder将不可变,从而避免出现不正确发布的问题
public class Holder {
private int n;
public Holder(int n) {
this.n = n;
}
public void assertSanity() {
if (n != n)
throw new AssertionError("This statement is false.");
}
}
解决上面的问题的一种方式,是使用静态初始化,它是JVM在类的初始化阶段执行,由于JVM存在这同步机制,因此这种方式初始化的任何对象都可以被安全的的发布:
public static Holder holder= new Holder(42);
- 安全的发布对象的方式:在静态初始化函数中初始化一个对象引用;将对象的引用保存到volatile类型的域或者AtomicReference对象中;将对象的引用保存到某个正确构造对象的final域中;将对象保存到一个由锁保护的域中
- 事实不可变对象:如果一个对象技术是可变的,但是一旦发布后不再改变,把这种称之为“事实不可变对象”。在没有额外同步的情况下,任何线程都可以安全的使用被安全发布的事实不可变对象
- 对象的发布需求取决于它的可变性:不可变的对象可以通过任意机制来发布,事实不可变的对象必须通过安全的方式发布;可变对象必须通过安全方式来发布,并且必须是线程安全的或者由某个锁保护起来
对象的组合
- 对象的状态一般又其域构成;同步策略定义了如何在不违背对象不变条件或者后验条件的情况下(比如下一个状态可预测)对其状态的访问操作进行协同
- 构建线程安全类的最简单的方式:线程封闭:将数据封装在对象内部,并将数据的访问限制在对象的方法上
- 监视器模式:线程封闭一般采用监视器模式,就是利用对象的monitor进行同步(synchronize修饰),案例代码(一个汽车跟踪类)如下:
@ThreadSafe
public class MonitorVehicleTracker {
//MutablePoint是普通的可变对象
//这里Map的key表示汽车的名称
@GuardedBy("this") private final Map<String, MutablePoint> locations;
public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
public synchronized Map<String, MutablePoint> getLocations() {
return deepCopy(locations);
}
public synchronized MutablePoint getLocation(String id) {
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
public synchronized void setLocation(String id, int x, int y) {
MutablePoint loc = locations.get(id);
if (loc == null)
throw new IllegalArgumentException("No such ID: " + id);
loc.x = x;
loc.y = y;
}
private static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> m) {
Map<String, MutablePoint> result = new HashMap<String, MutablePoint>();
for (String id : m.keySet())
result.put(id, new MutablePoint(m.get(id)));
return Collections.unmodifiableMap(result);
}
}
- 线程安全的委托:委托其他类实现线程安全:
@ThreadSafe
public class DelegatingVehicleTracker {
//线程安全的Map
private final ConcurrentMap<String, Point> locations;
private final Map<String, Point> unmodifiableMap;
public DelegatingVehicleTracker(Map<String, Point> points) {
locations = new ConcurrentHashMap<String, Point>(points);
unmodifiableMap = Collections.unmodifiableMap(locations);
}
//注意这里没有使用深拷贝,是因为Point是不可变类(final修饰变量),可以安全发布
public Map<String, Point> getLocations() {
return unmodifiableMap;
}
public Point getLocation(String id) {
return locations.get(id);
}
public void setLocation(String id, int x, int y) {
if (locations.replace(id, new Point(x, y)) == null)
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
// Alternate version of getLocations (Listing 4.8)
public Map<String, Point> getLocationsAsStatic() {
return Collections.unmodifiableMap(
new HashMap<String, Point>(locations));
}
}
- 客户端加锁:使用某个对象的客户端代码,使用对象本身用于保护其状态的锁来保护客户端的代码,比如:
@ThreadSafe
class GoodListHelper <E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
//注意这里不能使用GoodListHelper的this引用加锁,因为list里面的同步操作的锁是list引用
//这样导致客户端代码和对象使用的锁不一致,出现list操作线程不安全的问题
public boolean putIfAbsent(E x) {
synchronized (list) {
boolean absent = !list.contains(x);
if (absent)
list.add(x);
return absent;
}
}
}
基础构建模块
- 同步容器包括Vector和HashTable,以及Collections.synchronizedXxx等工厂方法创建,它们实现是将状态封装起来,然后所有的共有方法都进行同步。这样会产生一个问题:效率不高,比如迭代的时候,需要
- 同步容器的问题:单个操作时同步的,但是复合操作不是,比如putIfAbsent操作。由于复合操作存在异常,像Vector会采用fail-fast策略:即出现复合操作产生的并发问题(比如getLast和deleteLast在同一个对象上频繁调用或者迭代期间出现并发修改),直接抛出异常,这往往是不是我们需要的(get和delete一般不产生异常)。更加常见的比如,在调用size和get之间,vector的长度可能会发生变化,这种在迭代中也不可避免,这可能会抛出数组越界异常,当然我们可以通过客户端加锁的方式实现安全迭代,但是迭代期间,对于容器的其他方法的调用都是阻塞的
- 隐藏的迭代器:同步容器的迭代式不安全的,有时候迭代式存在隐含的条件里面,让我们难以发现:比如打印容器变量(会调用toString),或者调用hashCode、equals、contailsAll、removeAll、retailAll方法的时候
- 并发容器之ConcurrentHashMap:采用分段锁来提高并发性能、不会在迭代时对容器加锁(允许并发时修改,也允许size等方法返回估算值)、提供了常见的复合的原子操作比如putIfAbsent等方法
- 并发容器之CopyOnWriteArrayList:用于代替同步List,CopyOnWriteArraySet代替同步Set,copy on write是指在修改的创建一个容器的副本,修改后替换以前的引用,从而在修改的时候不影响读,适合读多写少的情况,确定是内存开销大
- 阻塞队列-生产者消费者模式:阻塞队列提供了put和take的阻塞方法以及带有时间的offer和poll方法。队列可以是有界的也可以是无界的,无界队列的put方法不会阻塞。BlockingQueue简化了生产者-消费者的实现过程,支持任意数量的生产者和消费者。如果阻塞队列不完全复合生产者和消费者设计的需求,可以使用信号量来创建自己的数据结构
- BlockingQueue有多种实现:LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue是一个优先级队列,SynchronousQueue是一个只存一个元素的队列。
- 一个案例:生产者搜索问题并放入队列,消费者取出文件名称并建立索引:
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
//主程序
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
- 双端队列与工作窃取:Java6新增了两种容器类型Deque和BlockingDeque(前者子接口),他们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列。具体实现有ArrayDeque和LinkedBlockingDeque。双端队列适用于工作窃取模式。
- InterruptedException处理:阻塞的方法会抛出InterruptedException,表示中断,中断是一种协作机制,并不是强制的去中止线程。对于该异常的处理有两个基本的方式:传递InterruptedException(包括直接抛出或者捕获并处理返回);恢复中断,使上层的代码知道中断了。切记不要吞异常不做处理,下面是恢复中断的一个案例:
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
@Override
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
void processTask(Task task) {
// Handle the task
}
interface Task {
}
}
- 同步工具类之闭锁:
闭锁的含义:延迟线程的进度直到某个中止状态
可能用来:某个计算需要所有的资源都被初始化之后再执行;确保某个服务在其依赖的所有其他服务都已经启动之后再启动;等待某个操作的所有参与者都就绪再执行。
闭锁之CountDownLatch,下面是使用CountDownLatch的代码案例:
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
//上面所有的线程都再等待下面的countDown
startGate.countDown();
//等待上面的线程都执行完后倒数
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
- 同步工具类之FutureTask:FutureTask也可以当成闭锁,下面是使用案例:
public class Preloader {
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}
private final FutureTask<ProductInfo> future =
new FutureTask<>(() -> loadProductInfo());
private final Thread thread = new Thread(future);
public void start() { thread.start(); }
public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw LaunderThrowable.launderThrowable(cause);
}
}
}
interface ProductInfo {
}
}
- 同步工具类之信号量:可以用来自定义自己的同步机制:acquire获取许可,release释放许可,如果信号量是1,可以用作mutex,信号量也可以用作线程池,下面是信号量的一个使用案例:
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}
- 同步工具类之栅栏:
栅栏与闭锁的区别:闭锁用于等待事件,栅栏用于等待其他线程,栅栏常用在分解和合并任务上,下面是一个案例:
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
() -> mainBoard.commitNewValues());
this.workers = new Worker[count];
for (int i = 0; i < count; i++) {
//分解成子问题
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
}
//工作线程
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
@Override
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++) {
for (int y = 0; y < board.getMaxY(); y++) {
board.setNewValue(x, y, computeValue(x, y));
}
}
try {
//等待所有的线程都工作完后再往下执行
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}
//主方法
public void start() {
//分解工作
for (Worker worker : workers) {
new Thread(worker).start();
}
//合并工作
mainBoard.waitForConvergence();
}
interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}
另一种形式的栅栏是Exchanger,各方在栅栏上交换数据,一个使用案例:
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<Object> exchanger = new Exchanger<>();
new Thread(() -> {
Object object = new Object();
System.out.println(Thread.currentThread().getName() + "创建的对象是" + object);
try {
object = exchanger.exchange(object);
System.out.println(Thread.currentThread().getName() + "交换后得到的对象是" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程1").start();
new Thread(() -> {
Object object = new Object();
System.out.println(Thread.currentThread().getName() + "创建的对象是" + object);
try {
TimeUnit.SECONDS.sleep(2);
object = exchanger.exchange(object);
System.out.println(Thread.currentThread().getName() + "交换后得到的对象是" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程2").start();
}
}
- 一个耗时计算的缓存类涉及案例:
public class Memoizer <A, V> implements Computable<A, V> {
//缓存
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(final A arg) throws InterruptedException {
while (true) {
//这里存Future是为了避免重复计算
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = () -> c.compute(arg);
FutureTask<V> ft = new FutureTask<>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
//计算失败要移除,避免影响后续的计算
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
结构化并发应用程序
- 无限制创建线程处理请求的不足:线程生命周期的开销高(创建和销毁线程有代价),大量的空闲线程会占用内存,线程数量有限制,超出会产生异常
- Executor框架:一个基于Executor的Web服务器
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec
= Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
- 线程池:
newFixedThreadPool:固定数量的线程池
newCachedThreadPool: 复用创建过的线程,线程数量没有限制,当线程池的当前规模超过了处理需求时,那么回收空闲的线程
newSingleThreadExecutor:单线程的Executor
newScheduledThreadPool:创建了一个固定长度的线程池,而且以延迟或者定时的方式来执行任务
ExecutorService扩展了Executor,主要添加了生命周期的管理,ExecutorService有三种状态:运行、关闭和终止。shutdown方法是执行平缓的关闭过程,shutDownNow执行粗暴的关闭过程
下面是带有声明周期管理的服务代码:
public class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool();
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown())
log("task submission rejected", e);
}
}
}
public void stop() {
exec.shutdown();
}
private void log(String msg, Exception e) {
Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
}
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop();
else
dispatchRequest(req);
}
interface Request {
}
private Request readRequest(Socket s) {
return null;
}
private void dispatchRequest(Request r) {
}
private boolean isShutdownRequest(Request r) {
return false;
}
}
- 周期任务Timer类的缺点:单线程,在任务执行时间超过定时周期,会出现多次连续调用或者丢弃的情况,Timer线程不捕获异常,抛出异常后,整个Timer都被取消,已经被调度但尚未执行的TimerTask将不会执行,新的任务也不能被调度。下面是一个案例代码,异步任务完成后会放在阻塞队列中,我们可以直接在阻塞队列中获取结果:
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(1);
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}
static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}
- ExecutorService的所有submit方法都将返回一个Future
- CompletionService将Executor和BlockingQueue的功能融合在一起,可以将Callable的任务交给它来执行,然后使用类似队列操作的take和poll等方法来获取已完成的结果.ExecutorCompletionService是其实现类,并委托Executor来实现计算。下面是一个页面异步渲染的案例:
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
- 对n个任务使用Future调用并带有超时时间有个更简单的方法:invokeAll
public class TimeBudget {
private static ExecutorService exec = Executors.newCachedThreadPool();
//并发执行多个任务
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies) {
tasks.add(new QuoteTask(company, travelInfo));
}
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes =
new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
quotes.sort(ranking);
return quotes;
}
}
取消和关闭
- Java不采用抢占式方法来停止线程,主要通过协作式的机制,也就是通过标志位来判断是否停止线程,案例:
一个取消的任务必须拥有取消策略:即How、When、What,怎么取消、何时检查取消标志以及执行哪些操作
@ThreadSafe
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
@GuardedBy("this") private final List<BigInteger> primes
= new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
//让代码执行一秒钟后取消
SECONDS.sleep(1);
} finally {
//中断后也能取消
generator.cancel();
}
return generator.get();
}
}
- 中断取消:上面的取消策略在于如果调用的是阻塞方法,会导致可能永远不会检查标志,中断是实现取消的最合理方式。对中断操作的正确理解是:它并不会真正的中断一个正在运行的线程,而只是发出中断请求,然后由线程在合适的时刻中断自己
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
- 响应中断的两种方式:传递异常、恢复中断状态(使上层代码能够处理)
- 谨慎的中断线程:Future有一个mayInterruptIfRunning方法,表示线程能否被中断,如果是false表示任务没启动就不要启动了。注意的是,除非你清楚线程的中断策略,否则不要指定为true(中断后的执行的代码是线程所有者执行的,可能并不清楚其实现,比如如果线程的拥有者是Executor,它有中断策略,则可以设置为true)。案例代码:
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// task will be cancelled below
} catch (ExecutionException e) {
// exception thrown in task; rethrow
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
}
- 并非所有的阻塞机制都能响应中断:比如IO阻塞或者获得某个锁。对于IO可以调用close方法来停止阻塞,或者可以通过Lock类的lockInterruptibly方法来中断。下面是一个案例:
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */
}
}
public void processBuffer(byte[] buf, int count) {
}
}
也可以通过ThreadPoolExecutor的newTaskFor来定制化FutureTask的取消行为:
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
@Override
// 重写了cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
/**
* 定义了一个可取消任务的接口
*/
interface CancellableTask <T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
/**
* 自定义的ThreadPoolExecutor
*/
@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
- ExecutorService中提供了shutdown和shutdownNow来关闭线程服务,后者是强行关闭。一般调用shutdown方法后会调用awaitTermination来等待所有线程关闭
- 毒丸对象:用于生产者消费者场景,毒丸是指一个放在队列上的对象,但放入这个对象后,生产者停止再投放任务,并保证消费者将消费之前提交的任务。下面是前面一个示例的改写:
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
//消费者
private final IndexerThread consumer = new IndexerThread();
//生产者
private final CrawlerThread producer = new CrawlerThread();
//生产队列
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* fall through */
} finally {
//一直循环投放毒丸,因为投放可能会失败
while (true) {
try {
//毒丸对象
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
//存放任务
queue.put(entry);
}
}
}
}
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}
public void start() {
producer.start();
consumer.start();
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
- 完备的shutdownNow:shutdownNow会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,但是我们不确定哪些任务是未开始的。下面的案例给出了如何在关闭过程中判断正在执行的任务
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
//执行过程中中断的任务
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public void shutdown() {
exec.shutdown();
}
public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}
public boolean isShutdown() {
return exec.isShutdown();
}
public boolean isTerminated() {
return exec.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
//执行代码必须维持中断状态
//任务可能会被执行完成了,如果任务是幂等的是OKDE
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
下面是使用TrackingExecutor来保存未完成的任务以备后续继续执行的案例:
public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;
public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}
public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
} finally {
exec = null;
}
}
protected abstract List<URL> processPage(URL url);
private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable task : uncrawled)
//保存未完成的任务
urlsToCrawl.add(((CrawlTask) task).getPage());
}
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}
private class CrawlTask implements Runnable {
private final URL url;
CrawlTask(URL url) {
this.url = url;
}
private int count = 1;
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}
void markUncrawled() {
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
- Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。要为线程池中的所有线程设置一个UncaughtExeptionHandler,需要为ThreadPoolExecutor构造函数提供一个ThreadFactory。只有execute提交的任务,才能将它抛出的异常交给未捕获异常处理器,submit提交的任务,异常都被认为是任务返回状态的一部分
- JVM关闭:正常关闭中,JVM首先调用所有已注册的关闭钩子。关闭时,JVM并不会停止或者中断仍在运行的应用程序,当JVM最终结束时,这些线程将强行结束。
- 守护线程:有时候希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM的关闭,这种情况下就需要使用守护线程;线程可分为:普通线程和守护线程,正常情况下除了主线程,其他的线程都是守护线程,线程会继承守护状态,因此默认主线程创建的所有线程都是普通线程。应尽可能少的使用守护线程,因为JVM停止时,所有存在的守护线程将会被抛弃。
线程池的使用:
- 如果提交的任务依赖于其他的任务,那么除非线程池无限大,否则将可能造成死锁
- 如果提交到线程池中的任务时间过长,那么,如果在线程池数量不大的情况下,会造成所有的线程会运行时间较长的任务,从而影响整体的响应性
- 拒绝策略有:AbortPolicy, DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy(使用调用者的线程执行代码)
- 自定义ThreadFactory代码:
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}
public class MyAppThread extends Thread {
/**
* 自定义名称
*/
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
//已创建的线程统计
private static final AtomicInteger created = new AtomicInteger();
//存活线程统计
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
//设置异常处理器
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
//存活线程统计
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
- ThreadPoolExecutor创建后仍然可以修改线程池的参数。同时Executors包含了一个unconfigurableExecutorService方法用来包装ExecutorsService,使其配置不能被修改
- ThreadPoolExecutor是可扩展的,提供了这样几个方法:beforeExecute、afterExecute、terminated方法。无论run方法是否正常返回afterExecute都会调用。在线程完成关闭操作时会调用terminated方法,下面是一个案例
/**
* 加了统计线程执行时间功能
*/
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
活跃性、性能与测试
- 锁顺序死锁:两个线程视图以不同的顺序来获得相同的锁。如果所有线程以固定的顺序来获得锁,那么在程序中就不会出现死锁问题,下面是一个死锁的案例:
public class LeftRightDeadlock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}
public void rightLeft() {
synchronized (right) {
synchronized (left) {
doSomethingElse();
}
}
}
void doSomething() {
}
void doSomethingElse() {
}
}
有时候我们加锁的顺序是一致的但是参数是动态的,也可能发送锁顺序死锁:
public class DynamicOrderDeadlock {
// Warning: deadlock-prone!
public static void transferMoney(Account fromAccount,
Account toAccount,
DollarAmount amount)
throws InsufficientFundsException {
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}
static class DollarAmount implements Comparable<DollarAmount> {
// Needs implementation
public DollarAmount(int amount) {
}
public DollarAmount add(DollarAmount d) {
return null;
}
public DollarAmount subtract(DollarAmount d) {
return null;
}
public int compareTo(DollarAmount dollarAmount) {
return 0;
}
}
static class Account {
private DollarAmount balance;
private final int acctNo;
private static final AtomicInteger sequence = new AtomicInteger();
public Account() {
acctNo = sequence.incrementAndGet();
}
void debit(DollarAmount d) {
balance = balance.subtract(d);
}
void credit(DollarAmount d) {
balance = balance.add(d);
}
DollarAmount getBalance() {
return balance;
}
int getAcctNo() {
return acctNo;
}
}
static class InsufficientFundsException extends Exception {
}
}
解决办法就是在动态的情况下也要保证加锁对象的顺序是一致的(通过对象的哈希排序):
public class InduceLockOrder {
private static final Object tieLock = new Object();
public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
//通过hashCode来固定顺序
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);
if (fromHash < toHash) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAcct) {
synchronized (fromAcct) {
new Helper().transfer();
}
}
} else {
synchronized (tieLock) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
}
}
}
interface DollarAmount extends Comparable<DollarAmount> {
}
interface Account {
void debit(DollarAmount d);
void credit(DollarAmount d);
DollarAmount getBalance();
int getAcctNo();
}
class InsufficientFundsException extends Exception {
}
}
但是存在对象的hashCode冲突的情况,这中情况下可以再引入一个锁,保证在冲突的情况下只要一个线程在加锁。同时,如果对象信息有唯一信息,比如账号,可以按照这些信息排序后加锁
- 对象协作之间发生的死锁:多个类之间的同步方法的互相调用也可以产生死锁,比如:
public class CooperatingDeadlock {
// Warning: deadlock-prone!
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
/**
* 同步方法默认会获得一个锁,然后调用外部方法又会获取其他得锁,
* 从而有可能导致多个线程加锁顺序不一致
*/
public synchronized void setLocation(Point location) {
this.location = location;
if (location.equals(destination))
dispatcher.notifyAvailable(this);
}
public synchronized Point getDestination() {
return destination;
}
public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public synchronized Image getImage() {
Image image = new Image();
for (Taxi t : taxis)
image.drawMarker(t.getLocation());
return image;
}
}
class Image {
public void drawMarker(Point p) {
}
}
}
解决办法就是:尽量不要使用同步方法,细化加锁范围,使加锁的代码区分开
class CooperatingNoDeadlock {
@ThreadSafe
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
/**
* 细化加锁范围,使加锁的代码不产生嵌套
*/
public synchronized void setLocation(Point location) {
boolean reachedDestination;
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
public synchronized Point getDestination() {
return destination;
}
public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}
@ThreadSafe
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public Image getImage() {
Set<Taxi> copy;
synchronized (this) {
copy = new HashSet<Taxi>(taxis);
}
Image image = new Image();
for (Taxi t : copy)
image.drawMarker(t.getLocation());
return image;
}
}
class Image {
public void drawMarker(Point p) {
}
}
}
- 开放调用:在调用某个方法时不需要持有锁,上面的案例就是采用开放调用来避避免死锁。在程序中应该尽量使用开放调用
- Java6不会Dump 显式Lock的信息,java6可以但是精度不高
- Thread.yield和sleep的语义都是未定义的,JVM既可以将他们视为空操作,也可以用来作为线程调度的参考
- 活锁:线程不断的重复执行相同的动作,而且总会失败,失败后又去执行(比又放入了队列的开头),解决活锁:引入随机性
减少死锁
- 可伸缩性:当增加计算资源时,程序的吞吐量或者处理能力能相应地增加
- 线程的开销:上下文切换、内存同步(内存屏障会抑制编译器优化)、阻塞
- 减少锁的竞争程度:减少锁的持有时间、降低请求的频率(分段锁,锁对象拆分)、使用带有协调机制的独占锁,这些机制允许更高的并发性(ReadWriteLock)。下面是一个分段的Map代码案例:
public class StripedMap {
// Synchronization policy: buckets[n] guarded by locks[n%N_LOCKS]
private static final int N_LOCKS = 16;
private final Node[] buckets;
private final Object[] locks;
private static class Node {
Node next;
Object key;
Object value;
}
public StripedMap(int numBuckets) {
buckets = new Node[numBuckets];
locks = new Object[N_LOCKS];
for (int i = 0; i < N_LOCKS; i++)
locks[i] = new Object();
}
private final int hash(Object key) {
return Math.abs(key.hashCode() % buckets.length);
}
public Object get(Object key) {
int hash = hash(key);
synchronized (locks[hash % N_LOCKS]) {
for (Node m = buckets[hash]; m != null; m = m.next)
if (m.key.equals(key))
return m.value;
}
return null;
}
public void clear() {
for (int i = 0; i < buckets.length; i++) {
synchronized (locks[i % N_LOCKS]) {
buckets[i] = null;
}
}
}
}
显式锁
- Lock接口提供了一种无条件、可轮询的、定时的以及可中断的锁的获取,加锁和解锁是显式的。解决上面动态顺序死锁的一种方式:
public boolean transferMoney(Account fromAcct,
Account toAcct,
DollarAmount amount,
long timeout,
TimeUnit unit)
throws InsufficientFundsException, InterruptedException {
long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
long randMod = getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return false;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
- lockInterruptibly方法能够在获得锁的同时响应中断,带超时时间的tryLock也可以
public boolean sendOnSharedLine(String message)
throws InterruptedException {
lock.lockInterruptibly();
try {
return cancellableSendOnSharedLine(message);
} finally {
lock.unlock();
}
}
- ReentrantLock提供了创建公平锁和非公平锁的选项。大多数情况下,非公平锁性能要高于公平锁的性能(公平锁挂起和恢复线程的开销更大)
- 内置锁和Lock的取舍:建议当内置锁(synchronized)不满足需求时,才考虑使用ReentrantLock。Java6已经优化了内置锁,性能和ReentrantLock性能相当。内置锁在Dump中更方便查看哪些线程获取了锁
构建自定义的同步工具
- 状态依赖:同步工具类中很多操作有着基于状态的前提条件,比如队列非空或者任务完成等等,这些前提条件在并发的环境下是多变的,因此在条件不满足的情况下,需要多次尝试。下面介绍几种有界缓存的实现,其中将采用不同的方法来解决前提条件失败的问题:
一种简单的处理就是条件不满足,对外抛出异常,让调用者处理尝试:
/**
* 有界队列的基类
*/
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
@GuardedBy("this") private final V[] buf;
@GuardedBy("this") private int tail;
@GuardedBy("this") private int head;
@GuardedBy("this") private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length)
tail = 0;
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length)
head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
前提条件不满抛出异常:
@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer() {
this(100);
}
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
/**
* 使用案例,重试的策略由客户端执行
*/
class ExampleUsage {
private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
void useBuffer() throws InterruptedException {
while (true) {
try {
String item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
}
问题:调用者需要处理异常,同时队列为空并不是一个异常情况。另一种实现是通过轮询和休眠来实现简单的阻塞等待:
@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
int SLEEP_GRANULARITY = 60;
public SleepyBoundedBuffer() {
this(100);
}
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
//采用睡眠等待
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty())
return doTake();
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
问题:睡眠可能会中断,客户端又要出中断异常。因此上面两种方式都不是很好的实现
- 对象的条件队列:基于内置锁的阻塞队列,通过内置锁和wait、notify、notifyAll来配置使用。下面使用条件队列来实现有界缓存的示例:
@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}
public BoundedBuffer(int size) {
super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty())
wait();
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
}
- 条件谓词:就是指等待的条件,是使某个操作成为状态依赖操作的前提条件
- 条件等待中的三元关系:加锁、wait方法、条件谓词。条件谓词中包含多个变量,而状态变量由一个锁来保护,因此在检查条件谓词之前必须先持有这个锁
- 一个线程被notifyAll唤醒时,并不意味着谓词已经成真了(存在多个条件共用一个锁),因此线程唤醒后要重新判断条件
- 当等待一个条件时,一定要确保条件谓词在变为真时通过某种方式发出通知,比如notify和notifyAll,推荐使用notifyAll,因为notify只会唤醒一个线程,表示单进单出,而且被唤醒的线程是不是在该谓词上等待是不确定的。同时为了解决不必要的唤醒,可以通过判断条件确实发生转换后才发出通知(上面的alternatePut方法就是采用这种方式。
下面是一个使用wait和notifyAll来实现可重新关闭的阀门:
@ThreadSafe
public class ThreadGate {
// CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {
isOpen = false;
}
public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
//注意这里有两个判断条件:为open或者调用过open(generation会变化)时停止等待
//原因在于可能多线程竞争唤醒后又关闭了(此时generation会变化)
while (!isOpen && arrivalGeneration == generation)
wait();
}
}
- 显式Condition对象:显式锁Lock和内置锁以及显式Condition和内置条件队列是相对应的组合。Condition提供了await、signal、singalAll方法。一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。下面是使用Condition实现有界缓存队列:
@ThreadSafe
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
@GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock") private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
- 使用Lock来实现信号量:
public class SemaphoreOnLock {
private final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: permitsAvailable (permits > 0)
private final Condition permitsAvailable = lock.newCondition();
@GuardedBy("lock") private int permits;
SemaphoreOnLock(int initialPermits) {
lock.lock();
try {
permits = initialPermits;
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: permitsAvailable
public void acquire() throws InterruptedException {
lock.lock();
try {
while (permits <= 0)
permitsAvailable.await();
--permits;
} finally {
lock.unlock();
}
}
public void release() {
lock.lock();
try {
++permits;
permitsAvailable.signal();
} finally {
lock.unlock();
}
}
}
- 使用AQS实现二元闭锁:
@ThreadSafe
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// Succeed if latch is open (state == 1), else fail
// 1打开,0关闭
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1); // Latch is now open
return true; // Other threads may now be able to acquire
}
}
}
原子变量与非阻塞同步机制
- 并发包中的很多类都比synchronized机制有更高的性能和可伸缩性,这主要来源于原子变量和非阻塞的同步机制。锁的劣势:线程的挂起和恢复等过程存在很大的开销。volatile变量的局限性:不能构建原子的复合操作
- 大部分处理器支持CAS指令来实现乐观锁,下面的是模拟CAS操作的代码:
@ThreadSafe
public class SimulatedCAS {
@GuardedBy("this") private int value;
public synchronized int get() {
return value;
}
public synchronized int compareAndSwap(int expectedValue,
int newValue) {
int oldValue = value;
if (oldValue == expectedValue)
value = newValue;
return oldValue;
}
public synchronized boolean compareAndSet(int expectedValue,
int newValue) {
return (expectedValue
== compareAndSwap(expectedValue, newValue));
}
}
使用CAS实现的一个线程安全的计数器:
@ThreadSafe
public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}
- 当竞争不高的时候,基于CAS的计数器在性能上远远超过了基于锁的计数器
- 在高度竞争的情况下爱,锁的性能超过原子变量的性能。但在更真实的竞争情况下,原子变量的性能将超过锁的性能
- 一个线程的失败或挂起不会导致其他线程失败或挂起,这种算法被称为非阻塞算法。一个非阻塞的栈的案例:
@ThreadSafe
public class ConcurrentStack <E> {
AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node <E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
非阻塞的列表:
@ThreadSafe
public class LinkedQueue <E> {
private static class Node <E> {
final E item;
final AtomicReference<LinkedQueue.Node<E>> next;
public Node(E item, LinkedQueue.Node<E> next) {
this.item = item;
this.next = new AtomicReference<LinkedQueue.Node<E>>(next);
}
}
/**
* 哨兵节点
*/
private final LinkedQueue.Node<E> dummy = new LinkedQueue.Node<E>(null, null);
//头结点和尾结点开始都指向这个哨兵节点
private final AtomicReference<LinkedQueue.Node<E>> head
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
private final AtomicReference<LinkedQueue.Node<E>> tail
= new AtomicReference<LinkedQueue.Node<E>>(dummy);
public boolean put(E item) {
LinkedQueue.Node<E> newNode = new LinkedQueue.Node<E>(item, null);
while (true) {
LinkedQueue.Node<E> curTail = tail.get();
LinkedQueue.Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null) {
//这种情况就是处于下面两个compareAndSet方法的中间
//因此这里获得时间片之后,帮助处于中间状态的线程继续完成它的操作
tail.compareAndSet(curTail, tailNext);
} else {
// 处于稳定状态时的操作
if (curTail.next.compareAndSet(null, newNode)) {
// Insertion succeeded, try advancing tail
tail.compareAndSet(curTail, newNode);
return true;
}
}
}
}
}
}
Java内存模型
- JVM必须遵循一组最小保证,这组保证规定了对变量的写入操作何时对于其他线程可见。JVM为程序中所有的操作定义了一个偏序关系,称之为Happens-Before,这里简称为HP。要想执行操作B的线程能够看到A的结果,A和B必须满足HP关系。如果缺乏HP关系,JVM可以对他们任意的重排序,规则包括:
程序顺序规则:如果操作A在操作B之前,那么线程A操作将在B操作之前执行
监视器锁规则:在监视器锁上的解锁操作必在同一个监视器锁上的加锁操作之前执行
volatile变量规则:对volatile变量的写入操作必须在对该变量的读操作之前执行
启动规则:在线程上对start的调用必须在该线程中执行任何操作之前执行
线程结束规则:线程中的任何操作都必须在其他线程检测到该线程已经结束之前执行,或者从join中成功返回,或者调用isAlive时返回false
中断规则:当一个线程在另一个线程调用interrupt时,必须在被中断线程检测到interrupt调用之前执行
终结器规则:对象的构造函数必须在启动该对象的终结器之前执行完成
传递性:如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行
网友评论