中级09 - Java多线程初步
介绍多线程带来的问题,以及基本解决方案。
- 竞争条件带来的数据错误问题
- 死锁的原理、排查与防范
- 线程安全:同步方法、并发工具包
一 、线程不安全的表现 竞争条件带来的数据错误
- i++
- if-then-do
如果最终结果不正确,那么程序跑地再快也没有意义。
二、死锁的产生、排查和防范
1. 产生
Java 中某个锁只能同时被一个线程持有,当两个线程互相在等待对方持有的锁时就形成了死锁。
为了便于理解,想象一手交钱一手交货的问题:
A 持有现金,B 持有粉,但为了掩人耳目,只能通过一个狭小的窗口进行交易,无法同时一手交钱一手交货,,俩人都持有对方需要的资源,并且也都在等待对方手上的资源,但谁也不愿先给出自己的资源。
此时便形成了死锁。
类似的还有 哲学家就餐问题。
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
new Thread1().start();
new Thread2().start();
}
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (lock1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println("看不到我");
}
}
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (lock2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
System.out.println("还是看不到我");
}
}
}
}
}
2. 排查
使用jps
列出当前 Java 进程 ID:
jps
17876 Jps
17928 RemoteMavenServer36
9224 Launcher
19260
9164 DeadlockExample
jstack 打印给定 Java 进程中的所有栈信息进行排查:
jstack 9164
2019-09-29 17:01:49
# ...
# 省略
# ...
Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x0000000002d8b648 (object 0x00000000d61a4cb0, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x0000000002d8a1a8 (object 0x00000000d61a4cc0, a java.lang.Object),
which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
at com.github.hcsp.calculation.DeadlockExample$Thread2.run(DeadlockExample.java:42)
- waiting to lock <0x00000000d61a4cb0> (a java.lang.Object)
- locked <0x00000000d61a4cc0> (a java.lang.Object)
"Thread-0":
at com.github.hcsp.calculation.DeadlockExample$Thread1.run(DeadlockExample.java:24)
- waiting to lock <0x00000000d61a4cc0> (a java.lang.Object)
- locked <0x00000000d61a4cb0> (a java.lang.Object)
Found 1 deadlock.
3. 防范
所有的线程都按照相同的顺序获得资源的锁。
三、实现线程安全的基本手段
1. 不可变类 Integer/String 等
2. 同步方法
2.1 synchronized
- synchronized (一个对象) 把这个对象当成锁
- static synchronized方法 把 Class 对象当成锁
- 实例的 synchronized方法 把该实例当成锁(等价于 synchronized (this) {})
还是之前多个线程同时修改共享变量时的问题,现在采用 synchronizedd 同步块,使得同一时刻只能有一个线程持有锁,去执行同步块中的代码:
public class Test {
private static int i = 0;
private static final Object lock = new Object(); // 定义一个锁
public static void main(String[] args) {
for (int j = 0; j < 1000; j++) {
new Thread(Test::modifySharedVariable).start();
}
}
private static void modifySharedVariable() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) { // 使用同步块
i++;
System.out.println("i = " + i);
}
}
}
除了使用 synchronized 同步块,还可以使用 synchronized 关键字:
private synchronized static void modifySharedVariable() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
System.out.println("i = " + i);
}
2.2 Collections.synchronized*
如 Collections.synchronizedMap 等,基本只是把原来的 map 相关方法使用 synchronized 同步块包装了一遍。
但是注意,使用 Collections.synchronized* 时,如果存在非集合本身的,未经 synchronized 的操作,那么并不能保证线程安全,还是需要 synchronized 同步块。
3. 并发工具包
3.1 ConcurrentHashMap
下图中的 Collection 和 Map 都是线程不安全的:
[图片上传失败...(image-96a1e3-1575509314567)]
多个线程并发访问 map 是不安全的:
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class Main {
private static final Map<Integer, Integer> map = new HashMap<>();
public static void main(String[] args) {
for (int j = 0; j < 1000; j++) {
new Thread(Main::concurrentlyAccess).start();
}
}
private static void concurrentlyAccess() {
Integer r = new Random().nextInt();
map.put(r, r);
for (Integer i : map.keySet()) {
System.out.println(i);
}
}
}
报错如下:
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
at com.github.hcsp.calculation.Main.concurrentlyAccess(Main.java:25)
at java.lang.Thread.run(Thread.java:748)
解决办法除了使用刚才提到的 synchronized,还可以无脑地使用 JUC 并发工具包提供的线程安全的 ConcurrentHashMap 替换原本涉及到线程安全问题时的 HashMap。
3.2 Atomic*
可以创建具有原子性操作的数据,如 AtomicInteger。
3.3 ReentrantLock 可重入锁
ReentrantLock 通过一个双向链表来实现锁机制,每个节点代表一条线程,head 节点拥有
锁,其他节点均被挂起等待唤醒。等到当前锁释放成功后,会唤醒链表中的下一个节点,并将其更新为新的 head 节点。
可重入锁是指:当线程请求一个由其它线程持有的锁时,该线程会阻塞,而当线程请求由自己持有的锁时,如果该锁是可重入锁,请求就会成功,否则阻塞。
即当一个线程获取了某个对象锁后,还可以再次获得该对象锁。
ReentrantLock 作为一个可重入锁,内部是通过 state 作为计数器来统计总共执行了多少次 lock 方法,如果同一个锁 lock 两次,state 为 2,那么只调用一次 unlock,state 为 1,锁尚未释放,再 unlock 一次,state 为 0,锁成功释放。
ReentrantLock 和 synchronized 方法/声明具有相似的行为和语义,但是更灵活强大:
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
private static int i = 0;
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(ReentrantLockTest::concurrentlyAccess).start();
}
}
private static void concurrentlyAccess() {
lock.lock(); // block until condition holds
// 这里可以掩饰 lock.lock(); //
try {
i = i + 1;
System.out.println("i = " + i);
} finally {
lock.unlock();
}
}
}
可重入锁的作用就是为了避免死锁,因为 Java 中某个锁只能同时被一个线程持有,如下例所示,如果 synchronized 锁不可重入,不可再次获得,将会造成死锁,该线程一直等待进入方法 b 需要的这把对象锁,而该锁明明又被自己占用着。
所以 synchronized 作为可重入锁,进入 a 方法时持有当前对象锁,紧接着进入 b 方法时又要请求相同的锁,此时可重复持有当前对象锁:
private synchronized void a() {
b();
}
private synchronized void b() {
}
四、线程的状态与Object类中的线程方法
1. 线程的状态
Java中线程的状态分为 6 种:
- 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
- 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象创建后,其他线程(比如main线程)调用了该对象的 start() 方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取 CPU 的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得 CPU 时间片后变为运行中状态(running)。
-
阻塞(BLOCKED):线程因为需要等待一个锁时被阻塞的状态,拿不到锁,锁在别的线程手里。
-
等待(WAITING):前提是线程已经拥有锁了,然后进入该状态,等待其他线程做出一些特定动作(通知或中断)
-
超时等待(TIMED_WAITING):该状态不同于 WAITING,它可以在指定的时间后自行返回。
-
终止(TERMINATED):表示该线程已经执行完毕。
2. Obect 类中的线程方法
为什么 Java 中所有的对象都可以作为锁?因为所有的对象都继承自 Object 类,而 Object 类提供了线程相关的方法。
2.1 wait
只有持有某个对象锁的线程(假设是 A)才能调用该对象的 wait() 方法,调用后 A 会释放锁并进入waiting 状态,直到其他某个线程(B)抢到了对象锁,调用了该对象锁的 notify()/notifyAll() 方法后,被 wait 的 A 线程才能重新获得锁的所有权并继续执行代码。
2.2 notify
B 线程调用 notify() 唤醒一个正在等待该锁的线程 A,若有多个线程都在等待,那么也只会根据 JVM 内部机制唤醒其中一个。
2.3 notifyAll
notifyAll() 唤醒所有正在等待该对象锁的线程。
notify()/notifyAll() 方法所在的 synchronized 方法或块结束后,线程 B 释放该对象锁,然后这些被唤醒的线程会和其他线程一起按照常规自由竞争该锁。
五、生产者和消费者模型(多线程经典问题)
每当条件满足时,一个线程被唤醒,做完事情后,让自己继续沉睡的同时顺便唤醒别的线程。
请实现一个生产者/消费者模型,其中:
生产者生产10个随机的整数供消费者使用(随机数可以通过new Random().nextInt()获得)
使得标准输出依次输出它们,例如:
Producing 42
Consuming 42
Producing -1
Consuming -1
...
Producing 10086
Consuming 10086
Producing -12345678
Consuming -12345678
下面演示三种实现方式:
- wait/notify/notifyAll
- Lock/Condition
- BlockingQueue
1. wait/notify/notifyAll
使用最基本的 Object 类中的线程方法和 synchronized。
Boss,整体调度:
package com.github.hcsp.multithread;
public class Boss {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Container container = new Container();
Producer producer = new Producer(container, lock);
Consumer consumer = new Consumer(container, lock);
producer.start();
consumer.start();
producer.join();
producer.join();
}
}
Container,盛放制造品的容器:
package com.github.hcsp.multithread;
import java.util.Optional;
public class Container {
private Optional<Integer> value = Optional.empty();
public Optional<Integer> getValue() {
return value;
}
public void setValue(Optional<Integer> value) {
this.value = value;
}
}
Producerr,生产者:
package com.github.hcsp.multithread;
import java.util.Optional;
import java.util.Random;
public class Producer extends Thread {
private Container container;
private Object lock;
public Producer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (container.getValue().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer r = new Random().nextInt();
container.setValue(Optional.of(r));
System.out.println("Producing " + r);
lock.notify();
}
}
}
}
Consumer,消费者:
package com.github.hcsp.multithread;
import java.util.Optional;
public class Consumer extends Thread {
private Object lock;
private Container container;
public Consumer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (!container.getValue().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer v = container.getValue().get();
container.setValue(Optional.empty());
System.out.println("Consuming " + v);
lock.notify();
}
}
}
}
2. Lock/Condition
使用 ReentrantLock 和 Condition。
ReentrantLock 比 synchronized 自动加解锁机制更灵活。
Condition 是个接口,在 Java1.5 中出现,用来替代传统的 Object Monitor Methods 中 wait/notify/notifyAll() 的 是 await/signal/signalAll(),这种方式使线程间协作更加安全和高效,推荐使用。
Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()。
Condition 抽离出了线程调用机制,可以和任意的锁结合在一起,使得同一个对象可以有多个等待队列(一个锁可以 newCondition() 多次)。
使得在下面的例子中,当容器满了或者空了时,同一时间内只需要通知一条对应的等待中的线程。
Boss2:
package com.github.hcsp.multithread;
import java.util.concurrent.locks.ReentrantLock;
public class Boss2 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Container2 container = new Container2(lock);
Producer2 producer = new Producer2(container, lock);
Consumer2 consumer = new Consumer2(container, lock);
consumer.start();
producer.start();
producer.join();
producer.join();
}
}
Container2:
package com.github.hcsp.multithread;
import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Container2 {
private Condition notConsumedYet; // 尚未被消费掉
private Condition notProducedYet; // 尚未被生产出来
private Optional<Integer> value = Optional.empty();
public Container2(ReentrantLock lock) {
this.notConsumedYet = lock.newCondition();
this.notProducedYet = lock.newCondition();
}
public Condition getNotConsumedYet() {
return notConsumedYet;
}
public Condition getNotProducedYet() {
return notProducedYet;
}
public Optional<Integer> getValue() {
return value;
}
public void setValue(Optional<Integer> value) {
this.value = value;
}
}
Producer2:
package com.github.hcsp.multithread;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
public class Producer2 extends Thread {
private Container2 container;
private ReentrantLock lock;
public Producer2(Container2 container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (container.getValue().isPresent()) {
try {
container.getNotProducedYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer r = new Random().nextInt();
container.setValue(Optional.of(r));
System.out.println("Producing " + r);
container.getNotConsumedYet().signal();
} finally {
lock.unlock();
}
}
}
}
Consumer2:
package com.github.hcsp.multithread;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
public class Consumer2 extends Thread {
private Container2 container;
private ReentrantLock lock;
public Consumer2(Container2 container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (!container.getValue().isPresent()) {
try {
container.getNotConsumedYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer v = container.getValue().get();
container.setValue(Optional.empty());
System.out.println("Consuming " + v);
container.getNotProducedYet().signal();
} finally {
lock.unlock();
}
}
}
}
另外说一下个人对以上这 2 种生产者和消费者模型实现方式的理解:
设置条件判断是因为需要先生产后才能消费, 未消费前也不能再生产, 所以当生产或消费条件不满足时, 需要令当前线程进入 waiting。
另一方面,是为了当前线程被中断和假唤醒时,要继续进入 waiting, 并且要用循环一直"盯"着。
另外,在 for 循环中,synchronized 和 ReentrantLock 作为可重入锁可以重复加锁, 条件不成熟时也可重新进入锁,故也需要条件判断。
3. BlockingQueue
BlockingQueue 取回元素时要等待队列非空,存储元素时要等待队列非满,可以方便的实现生产者和消费者模型,创建两条 BlockingQueue,一条负责管理产品,一条负责管理调度信号,不再需要手动创建容器。
Boss3:
package com.github.hcsp.multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Boss3 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(1);
BlockingQueue signalQueue = new ArrayBlockingQueue(1);
Producer3 producer = new Producer3(queue, signalQueue);
Consumer3 consumer = new Consumer3(queue, signalQueue);
producer.start();
consumer.start();
producer.join();
producer.join();
}
}
Producer3:
package com.github.hcsp.multithread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Producer3 extends Thread {
BlockingQueue<Integer> queue;
BlockingQueue<Integer> signalQueue;
public Producer3(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
this.queue = queue;
this.signalQueue = signalQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
int r = new Random().nextInt();
System.out.println("Producing " + r);
try {
queue.put(r);
signalQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer3:
package com.github.hcsp.multithread;
import java.util.concurrent.BlockingQueue;
public class Consumer3 extends Thread {
BlockingQueue<Integer> queue;
BlockingQueue<Integer> signalQueue;
public Consumer3(BlockingQueue<Integer> queue, BlockingQueue<Integer> signalQueue) {
this.queue = queue;
this.signalQueue = signalQueue;
}
@Override
public void run () {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Consuming " + queue.take());
signalQueue.put(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
六、线程池与 Callable/Future
1. Callable/Future
1.1 Callable
Callable 接口引入自 Java1.5,和 Runnable 一样,它们的实例设计用于在其他线程中执行,但不同的是, Callable 可以返回值,也能抛异常。
1.2 Future
Future 接口代表⼀个“未来才会返回的结果”,调用 get() 方法会等待直到拿到计算结果。
2. 什么是线程池
2.1 定义
Java 的线程调度完全依赖于操作系统的线程调度,线程是昂贵的,不能无节制地开辟线程,以免将操作系统的资源耗尽,所以线程池是预先定义好的若干个线程。
2.2 Java 中的线程池
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 简单场景下为方便起见,直接使用 Executors 工具类快速创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// submit 方法会立刻返回一个 Future(类似于 JS 中的 Promise)
// 提交的任务会异步执行,不会阻塞当前线程
Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 0;
}
});
Future<String> future2 = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "ojbk";
}
});
Future<Object> future3 = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new RuntimeException();
}
});
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
2.3 线程池的构造函数(有坑待填)
3. 实战:多线程的 WordCount
创建 WordCount 对象时传入文件列表,可统计这些文件中的单词数。
使用方法:
List<File> files = xxx;
WordCount wordCount = new WordCount(10);
Map<String, Integer> countResult = wordCount.count(files);
System.out.println(countResult);
WordCount 类的实现:
package com.github.hcsp.multithread;
import org.apache.commons.io.FileUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class WordCount {
private final int threadNum;
private ExecutorService threadPool;
public WordCount(int threadNum) {
threadPool = Executors.newFixedThreadPool(threadNum);
this.threadNum = threadNum;
}
/**
* 统计文件中各单词的数量
*
* @param files 文件列表
* @return 单词统计结果
* @throws IOException 文件读写出错时抛出
* @throws ExecutionException Future 取回结果出错时抛出
* @throws InterruptedException 线程被中断时抛出
*/
public Map<String, Integer> count(List<File> files) throws IOException, ExecutionException, InterruptedException {
BufferedReader reader = new BufferedReader(new FileReader(mergeFilesIntoSingleFile(files)));
List<Future<Map<String, Integer>>> futures = new ArrayList<>();
// 开辟若干个线程,每个线程读取文件的一行内容,并将单词统计结果返回
// 最后主线程将工作线程返回的结果汇总在一起
for (int i = 0; i < threadNum; i++) {
futures.add(threadPool.submit(new WorkerJob(reader)));
}
// 最终结果集
Map<String, Integer> finalResult = new HashMap<>();
// 将futures中的每个子结果集合并到终集中
for (Future<Map<String, Integer>> future : futures) {
Map<String, Integer> resultFromWorker = future.get();
mergeWorkerResultIntoFinalResult(resultFromWorker, finalResult);
}
threadPool.shutdown();
return finalResult;
}
/**
* 将文件列表中的文件合并为一个文件
*
* @param files 文件列表
* @return 结果文件
* @throws IOException 文件读写出错时抛出
*/
private File mergeFilesIntoSingleFile(List<File> files) throws IOException {
File result = File.createTempFile("tmp", "");
for (File file : files) {
String encoding = "UTF-8";
FileUtils.write(result, FileUtils.readFileToString(file, encoding), encoding, true);
}
return result;
}
/**
* 将子集合并到终集中
*
* @param resultFromWorker 终集
* @param finalResult 子集
*/
private void mergeWorkerResultIntoFinalResult(Map<String, Integer> resultFromWorker,
Map<String, Integer> finalResult) {
for (Map.Entry<String, Integer> entry : resultFromWorker.entrySet()) {
String word = entry.getKey();
int mergedResult = finalResult.getOrDefault(word, 0) + entry.getValue();
finalResult.put(word, mergedResult);
}
}
static class WorkerJob implements Callable<Map<String, Integer>> {
private BufferedReader reader;
private WorkerJob(BufferedReader reader) {
this.reader = reader;
}
@Override
public Map<String, Integer> call() throws Exception {
String line;
Map<String, Integer> result = new HashMap<>();
while ((line = reader.readLine()) != null) {
String[] words = line.split(" ");
// System.out.println(Thread.currentThread().getName());
// System.out.println(line);
// System.out.println();
for (String word : words) {
result.put(word, result.getOrDefault(word, 0) + 1);
}
}
return result;
}
}
}
简单删除一些代码就可以实现单文件统计(略)。
参考:
网友评论