概述
- CPU、内存和 IO 之间存在着巨大性能差异
- 多核CPU的发展帮助了多线程的生成
- 线程的本质 => 一个可以执行代码的工人
- 优点:多个执行流,并发执行
- 缺点:
- 慢 => 切换上下文(1μs == 10^3 ns)(CPU数量远远小于线程数量) => 减少上下文切换 => 协程(用户态线程):在一个线程内部通过调度手段执行不同的工作
- 占用资源 => 每个线程有独立的方法栈
-
Thread 类的每一个实例代表 JVM 中的线程(start() 之后,且未结束),其他都不是线程
- 不同的执行流的同步执行是一切线程问题的根源
- 线程的开始方法只能是 Main 或 Thread.run 方法
Thread 底层模型
- JVM 中的线程(Thread 类的每一个实例)在 Linux 中称为轻量级进程,和进程无本质区别,JVM 中的进程共享内存空间
- JVM 中的线程(Thread 类的每一个实例)在 Windows 中使用系统进程
- 优点:简单,直接依赖操作系统的调度器
- 缺点:
- 占用资源多
- 上下文切换慢
- 不灵活,无法实现灵活的优先级
Thread 生命周期
Thread Life Cycle
- BLOCKED => A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait.
- WAITING => A thread is in the waiting state due to calling one of the following methods:
- Object.wait with no timeout
- Thread.join with no timeout
- LockSupport.park
- 同一个对象根据调用线程的不同返回不同的值
- 通常用于存储线程私有的值,方便后续流程使用
Runnable & Callable
- Runnable => 代表一个任务,可以被任何一个线程执行。没有返回值并且不能抛出 Checked Exception =>
public abstract void run();
- Callable => 和 Runnable 没有本质的区别,也代表一个任务。但是有返回值并且可以抛出 Checked Exception =>
V call() throws Exception;
Thread 中断
InterruptedException
- Thread.sleep() =>
public static native void sleep(long millis) throws InterruptedException;
- BlockingQueue.put() =>
void put(E e) throws InterruptedException;
- Thread.interrupt() => Interrupts this thread. => 唯一的用处就是取消一个耗时的操作
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread newThread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("I'm interrupted!");
throw new RuntimeException(e);
}
});
newThread.start();
Thread.sleep(500);
newThread.interrupt();
}
}
- 允许线程A对线程B进行有限程度的控制
- 线程可以选择不响应 InterruptedException,但是不推荐
- 除非你非常确定目标线程的中断特性,否则不要中断它
- 任何线程都可以被中断吗 => 中断与否取决于线程自己的决定
- 中断会发生在哪里?=> 线程自己的策略
- JVM 的阻塞方法都会正常响应中断 => e.g. Thread.sleep()
public class TaskRunnable implements Runnable {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// restore interrupted status
// 如果没有能力处理中断,那么请重新设置中断标志位,使得其他人能够知道该线程被中断了
Thread.currentThread().interrupt();
}
}
}
- JMM(Java Memory Model) => Java 内存模型
- 栈帧中的局部变量是线程私有的
- 除此之外其他东西都是共享的
Java Memory Model
- 主内存
- 工作内存 => 存有一个全局的主内存的变量的副本
- 公有变量会存在私有的副本
public class Main {
public static boolean cancelled = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (true) {
if (cancelled) {
// 取消自己做的事情
break;
}
try {
// 做一些定时器相关的事情
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
Thread.sleep(1000);
// 注意:修改的是主线程的副本,其他的线程有可能会没有感知 cancelled 已经更改
cancelled = true;
}
}
多线程问题
- 线程安全 => 当一个类在多线程环境下被使用时仍能表现出正确的行为 => 竞争条件 + 死锁 => synchronized
- 协同 => 同时、随机执行的线程,线程之间如何协同工作 => wait()/notify()/notifyAll()
- 效率和易用性 => 执行地越快越好 + 用起来越不容易出错越好
线程安全
竞争条件
class Main {
public static int globalI = 0;
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
// 多个线程同时读取 globalI 的值,之后加1,之后写回去
// 使用 volatile 也不可以,因为不是原子操作
new Thread(() -> globalI++).start();
}
System.out.println("globalI: " + globalI); // 正确应该是 100
}
}
// 创建单例
class SingletonObject {
private static SingletonObject SINGLETON_INSTANCE;
public static SingletonObject getInstance() {
// 创建过程非常昂贵,希望第一次使用的时候才去创建
// check-then-act
// 多线程模式下可能创建多个实例
if (SINGLETON_INSTANCE == null) {
SINGLETON_INSTANCE = new SingletonObject();
}
return SINGLETON_INSTANCE;
}
private SingletonObject() {
// ... expensive operations
}
}
// ConcurrentHashMap 多个操作联级
class Main {
public static Map<String, Object> values = new ConcurrentHashMap();
public static void main(String[] args) {
// 多个操作联级,不是原子的
// 使用 values.putIfAbsent("key", calculate("")); 代替
if (!values.containsKey("key")) {
values.put("key", calculate(""));
}
}
private static Object calculate(String s) {
return null;
}
}
-
原子性 => 解决方案:
- 不可变对象
- 各种锁 => synchronized/Lock
- 并发工具包 => java.util.concurrent => 底层通常是 CAS
- int/long => AtomicInteger/AtomicLong
- HashMap => ConcurrentHashMap
- ArrayList => CopyOnWriteArrayList
- TreeMap => ConcurrentSkipListMap
- [] => AtomicLongArray
- Object => AtomicReference
CAS
- CAS => Compare And Swap => 只有在看到内存中的值是预期的值的时候,更新才会成功,否则将不会成功。其和死循环一起使用,只要不成功,就再次更新一次
- Expectation(预期值) + New(新值) + Old(旧值)
- CAS => 若 E == O 则 O 更新为 N,否则失败
-
ABA 问题 => 旧值原来是 A,其他线程将旧值更新为 B,之后又更新为 A => TODO:Google Search CAS and ABA
- 原有链表 C -> D -> A,要将尾节点换为 B,原子操作
- CAS => 当且仅当尾节点是 A 的时候将 A 替换为 B
- 原有链表变为 C -> E,D -> A 从原来链表出来了,此时 CAS 还会正常更新
- 上述不仅仅是 A 的值是关键的,整个环境(链表状态)也是关键的
- 解决方案:时间戳。在更新 A 的时候不仅记录 A 的值,还记录当前时间戳,可以知道当 A 替换为 B 之后又变为 A,此时的 A 已经不是最开始的 A 了
- 乐观锁「自旋锁(spin lock)」=> 我现在追不到你,过一会再来问一下 => while(true) 等待 => 占用 CPU => CAS
- ConcurrentHashMap => 乐观锁 => 基本上都是 CAS 操作
- 悲观锁 => 我现在追不到你,沮丧地自暴自弃(阻塞)了 => 等待获取锁,获取到锁之后就可以执行了 => 放弃 CPU,等待被唤醒 => synchronized
- HashMap 线程不安全 => HashTable 是将 HashMap 的所有方法签名加添加 synchronized,是线程安全的 => 悲观锁
- HashMap 线程不安全 => synchronizedMap 使用 Decorator 模式,将一个 Map 包装成一个 SynchronizedMap,将所有方法添加 synchronized => 悲观锁
public class Main {
private static int counter = 0;
private static int pessimisticCounter = 0;
private static final AtomicInteger optimismCounter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
counter++;
optimismIncrement();
pessimisticIncrement();
}).start();
}
Thread.sleep(100);
System.out.println("线程不安全:" + counter);
System.out.println("乐观锁:" + optimismCounter);
System.out.println("悲观锁:" + pessimisticCounter);
}
private static void optimismIncrement() {
optimismCounter.getAndIncrement();
}
private synchronized static void pessimisticIncrement() {
pessimisticCounter++;
}
}
synchronized
- Java 语言级的支持,1.6之后性能极大提高
- 字节码层面的实现 => MONITORENTER/MONITOREXIT
public class Main {
/**
* bytecode
* // access flags 0x21
* public synchronized foo()V
* L0
* LINENUMBER 5 L0
* GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
* LDC ""
* INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/String;)V
* L1
* LINENUMBER 6 L1
* RETURN
* L2
* LOCALVARIABLE this Lcom/example/demo/Main; L0 L2 0
* MAXSTACK = 2
* MAXLOCALS = 1
*/
public synchronized void foo() {
System.out.println("");
}
/**
* bytecode
* // access flags 0x1
* public bar()V
* TRYCATCHBLOCK L0 L1 L2 null
* TRYCATCHBLOCK L2 L3 L2 null
* L4
* LINENUMBER 9 L4
* ALOAD 0
* DUP
* ASTORE 1
* MONITORENTER
* L0
* LINENUMBER 10 L0
* GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
* LDC ""
* INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/String;)V
* L5
* LINENUMBER 11 L5
* ALOAD 1
* MONITOREXIT
* L1
* GOTO L6
* L2
* FRAME FULL [com/example/demo/Main java/lang/Object] [java/lang/Throwable]
* ASTORE 2
* ALOAD 1
* MONITOREXIT
* L3
* ALOAD 2
* ATHROW
* L6
* LINENUMBER 12 L6
* FRAME CHOP 1
* RETURN
* L7
* LOCALVARIABLE this Lcom/example/demo/Main; L4 L7 0
* MAXSTACK = 2
* MAXLOCALS = 3
*/
public void bar() {
synchronized (this) {
System.out.println("");
}
}
}
- 锁住的是什么?
- synchronized(<Object>) => 锁住了对象
- public synchronized void foo() {} => 锁住了 this
- public synchronized static void bar() {} => 锁住了当前的 class 对象
- 底层实现
-
对象头定义
synchronized 对象头
- synchronized 锁膨胀过程
- 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
- 无锁 => 当前没有任何线程占用锁
- 偏向锁 => 一直以来都只有一个线程使用锁,下次直接用
- 轻量锁 => 有线程竞争的情况发生,但是不严重,假如某个线程能抢到锁,就不需要获取 monitor => 曾经有过多个线程在抢锁,但是同一个时刻只有一个人在抢锁
- 重量锁 => 同一个时刻有多个线程在同时竞争锁,所以必须要获取 monitor
- 锁粗化 => 同一个锁要连续频繁加锁解锁,粗化为更大范围的锁 => 编译器层面优化
public class Main {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
// 频繁地加锁解锁,此时编译器可以将其优化成一个更大范围的锁
foo();
}
}
public static synchronized void foo() {
System.out.println("");
}
}
- 锁消除 => 通过逃逸分析,发现一个对象不可能被其他线程所竞争,就不需要上锁 => 编译器层面优化
public class Main {
public void foo() {
Object object = new Object();
// Synchronization on local variable 'object'
synchronized (object) {
System.out.println("");
}
}
public Object bar() {
Object object = new Object();
// Synchronization on local variable 'object'
synchronized (object) {
System.out.println("");
}
// 此时其他线程可能拿到局部变量,object 逃逸
return object;
}
}
死锁
public class Main {
public static final Object lock1 = new Object();
public static final Object lock2 = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (lock2) {
try {
Thread.sleep(500);
synchronized (lock1) {
System.out.println("new Thread");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
synchronized (lock1) {
try {
Thread.sleep(500);
synchronized (lock2) {
System.out.println("main Thread");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
- 排查死锁
- jps(找到出问题的 Java 进程 PID) + jstack <PID>(查看栈轨迹) + 源代码
- 定时任务 + jstack + 源代码
- 避免死锁 => 所有的资源都以相同的顺序获得锁
线程协同
Java 原生线程协同机制
- wait()
- notify()
- notifyAll()
/**
* 生产者/消费者模型
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
Container container = new Container();
Consumer consumer = new Consumer(container);
Producer producer = new Producer(container);
producer.start();
consumer.start();
producer.join();
consumer.join();
}
static class Container {
// 通过修改 Container 从而可以让流程变得复杂,只需更改 Producer 和 Consumer 的循环条件
Object value;
}
static class Producer extends Thread {
private final Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// 如果 container 中没有值则 produce ,如果有值则等待 consume
synchronized (container) {
// 必须放置在 while 循环中,因为有可能唤醒了之后条件还是不允许(虚假唤醒 spurious awake)
// https://github.com/frohoff/jdk8u-jdk/blob/da0da73ab82ed714dc5be94acd2f0d00fbdfe2e9/src/share/classes/java/lang/Object.java#L418-L419
while (container.value != null) {
try {
container.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
int random = new Random().nextInt(40);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("produce -> " + random);
container.value = random;
container.notify();
}
}
}
}
static class Consumer extends Thread {
private final Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (container) {
while (container.value == null) {
try {
container.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("consumer -> " + container.value);
container.value = null;
container.notify();
}
}
}
}
}
缺点
- synchronized 在 1.6 之前性能不高
- wait/notify 太原始了,难用
- 不够灵活
- synchronized 只能修饰一个代码块或方法,如果需要在程序的一个类里面加锁,在另一个类里面解锁,synchronized 做不到。
- 实现优先级。上述生产者消费者模型优先级完全取决于操作系统对线程的调度
JUC
- JUC => java.util.concurrent
- 相较于 Java 原生线程协同机制
- 提高性能 => 使用了 AtomicInteger 等原子操作,通过 CAS 提高了性能
- 灵活 => 提供了多种场景下更方便的实现
- 易用
AtomicXXX
Lock/Condition
/**
* 生产者/消费者模型
*/
public class Main {
private static final int MAX_QUEUE_SIZE = 10;
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition queueEmpty = lock.newCondition();
Condition queueFull = lock.newCondition();
Queue<Integer> queue = new LinkedList<>();
for (int i = 0; i < 3; i++) {
// 3个 Consumer 线程会一直等待 Producer 线程生产
new Consumer(queue, lock, queueEmpty, queueFull).start();
}
for (int i = 0; i < 9; i++) {
new Producer(queue, lock, queueEmpty, queueFull).start();
}
}
static class Producer extends Thread {
private final Queue<Integer> queue;
private final Lock lock;
private final Condition queueEmpty;
private final Condition queueFull;
public Producer(Queue<Integer> queue, Lock lock, Condition queueEmpty, Condition queueFull) {
this.queue = queue;
this.lock = lock;
this.queueEmpty = queueEmpty;
this.queueFull = queueFull;
}
@Override
public void run() {
lock.lock();
try {
while (queue.size() >= MAX_QUEUE_SIZE) {
queueEmpty.await();
}
try {
Thread.sleep(400);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
int value = new Random().nextInt(40);
System.out.println("Producer: " + value);
queue.add(value);
queueFull.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
static class Consumer extends Thread {
private final Queue<Integer> queue;
private final Lock lock;
private final Condition queueEmpty;
private final Condition queueFull;
public Consumer(Queue<Integer> queue, Lock lock, Condition queueEmpty, Condition queueFull) {
this.queue = queue;
this.lock = lock;
this.queueEmpty = queueEmpty;
this.queueFull = queueFull;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
lock.lock();
try {
while (queue.size() == 0) {
// Consumer 会等待 Producer 生产
queueFull.await();
}
while (queue.size() > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Integer poll = queue.poll();
System.out.println("Consumer: " + poll);
}
queueEmpty.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
}
}
/**
* 线程安全:Lock
* 线程协同:Condition
*/
public class ThreadCoordination {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition completed = lock.newCondition();
AtomicInteger remainderThread = new AtomicInteger(10);
Map<Integer, Integer> result = new HashMap<>();
for (int i = 0; i < remainderThread.get(); i++) {
new Thread(() -> {
int value = new Random().nextInt();
lock.lock();
try {
Thread.sleep(100);
result.put(value, value);
// 此处不能使用 int。原因 => Error: Variable used in lambda expression should be final or effectively final
remainderThread.decrementAndGet();
completed.signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}).start();
}
lock.lock();
try {
while (remainderThread.get() > 0) {
try {
completed.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
lock.unlock();
}
System.out.println(result);
}
}
CountDownLatch
- 倒数闭锁
- 用于协调一组线程的工作
- Api => countDown() + await()
/**
* 线程安全:ConcurrentHashMap
* 线程协同:CountDownLatch
*/
public class ThreadCoordination {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
Map<Integer, Integer> result = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int value = new Random().nextInt();
try {
Thread.sleep(100);
// 有可能生成的 value 是相同的,此时 result.size < 10
result.put(value, value);
countDownLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
countDownLatch.await();
System.out.println(result);
}
}
CyclicBarrier
- 循环的屏障 => 用于协调一组线程达到一个共同的屏障点继续
- Api:await()
/**
* 线程安全:ConcurrentHashMap
* 线程协同:CountDownLatch
*/
public class ThreadCoordination {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
Map<Integer, Integer> result = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int value = new Random().nextInt();
try {
Thread.sleep(100);
// 有可能生成的 value 是相同的,此时 result.size < 10
result.put(value, value);
cyclicBarrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
cyclicBarrier.await();
System.out.println(result);
// cyclicBarrier 可以使用多次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int value = new Random().nextInt();
try {
Thread.sleep(100);
// 有可能生成的 value 是相同的,此时 result.size < 10
result.put(value, value);
cyclicBarrier.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
cyclicBarrier.await();
System.out.println(result);
}
}
Semaphore
- 信号量 => 当信号量为1时,就是锁(Lock),排他锁就是一个信号量为1的特殊的锁
- acquire() + release()
- 用于协同
/**
* 生产者/消费者模型
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
Semaphore fullSlot = new Semaphore(0);
Semaphore emptySlot = new Semaphore(3);
Semaphore lock = new Semaphore(1);
Queue<Integer> queue = new LinkedList<>();
for (int i = 0; i < 9; i++) {
new Consumer(queue, lock, fullSlot, emptySlot).start();
}
// Producer 在等待 Consumer 消费
new Producer(queue, lock, fullSlot, emptySlot).start();
}
static class Producer extends Thread {
private final Queue<Integer> queue;
private final Semaphore lock;
private final Semaphore fullSlot;
private final Semaphore emptySlot;
public Producer(Queue<Integer> queue, Semaphore lock, Semaphore fullSlot, Semaphore emptySlot) {
this.queue = queue;
this.lock = lock;
this.fullSlot = fullSlot;
this.emptySlot = emptySlot;
}
@Override
public void run() {
while (true) {
try {
emptySlot.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock) {
try {
Thread.sleep(400);
int value = new Random().nextInt(40);
System.out.println("Producer: " + value);
queue.add(value);
fullSlot.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
static class Consumer extends Thread {
private final Queue<Integer> queue;
private final Semaphore lock;
private final Semaphore fullSlot;
private final Semaphore emptySlot;
public Consumer(Queue<Integer> queue, Semaphore lock, Semaphore fullSlot, Semaphore emptySlot) {
this.queue = queue;
this.lock = lock;
this.fullSlot = fullSlot;
this.emptySlot = emptySlot;
}
@Override
public void run() {
try {
fullSlot.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock) {
try {
Thread.sleep(200);
Integer poll = queue.poll();
System.out.println("Consumer: " + poll);
emptySlot.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
Exchanger
/**
* 生产者/消费者模型
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
// 如果有多个 consumer,那么消费的值都是 null
Consumer consumer = new Consumer(exchanger);
// 如果有多个 producer,那么线程可能等待或者随机消费完
Producer producer = new Producer(exchanger);
consumer.start();
producer.start();
producer.join();
consumer.join();
}
static class Producer extends Thread {
private final Exchanger<Integer> exchanger;
public Producer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
int value = new Random().nextInt(40);
System.out.println(getName() + " Producer: " + value);
Thread.sleep(400);
exchanger.exchange(value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
static class Consumer extends Thread {
private final Exchanger<Integer> exchanger;
public Consumer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
// Consumer 在等 Producer 生产
for (int i = 0; i < 6; i++) {
try {
Thread.sleep(200);
System.out.println("Consumer: " + exchanger.exchange(null));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
BlockingQueue/BlockingDeque
- 传统的集合框架的操作要么正常返回,要么丢出异常 => BlockingQueue/BlockingDeque 提供一种等待的可能
- 阻塞 API: put()/take()
- 实现 => LinkedBlockingQueue + ArrayBlockingQueue
- 无需考虑线程安全(同步)问题,BlockingQueue 内部进行了同步
- LinkedBlockingQueue
=> 默认容量是 Integer.MAX_VALUE
=> 如果等待队列非常大,有资源耗尽的风险
/**
* 生产者/消费者模型
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
List<Consumer> consumerList = new ArrayList<>();
List<Producer> producerList = new ArrayList<>();
for (int i = 0; i < 8; i++) {
consumerList.add(new Consumer(queue));
}
for (int i = 0; i < 2; i++) {
producerList.add(new Producer(queue));
}
consumerList.forEach(Thread::start);
producerList.forEach(Thread::start);
for (Consumer consumer : consumerList) {
consumer.join();
}
for (Producer producer : producerList) {
producer.join();
}
}
static class Producer extends Thread {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
// Producer 在等 consumer 消费
while (true) {
try {
int value = new Random().nextInt(400);
System.out.println(getName() + " Producer: " + value + "。BlockingQueue size: " + queue.size());
Thread.sleep(800);
queue.put(value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
static class Consumer extends Thread {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(2000);
Integer take = queue.take();
System.out.println("Consumer: " + take);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
Future & ExecutorService
public class ExecutorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.txt -> a a a a a b b b b c c c
// 2.txt -> a a a a a b b b b c c c
// 3.txt -> a a a a a b b b b c c c
List<File> files = Arrays.asList(new File("1.txt"), new File("2.txt"), new File("3.txt"));
ExecutorService threadPoll = Executors.newFixedThreadPool(3);
// ExecutorService threadPoll = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), (runnable) -> new Thread(runnable, "My-thread"), new ThreadPoolExecutor.DiscardPolicy());
// ExecutorService threadPoll = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), (runnable) -> {
// Thread thread = new Thread(runnable, "My-thread");
//
// // 将线程设置为 daemon,The Java Virtual Machine exits when the only threads running are all daemon threads.
// thread.setDaemon(true);
// return thread;
// }, new ThreadPoolExecutor.DiscardPolicy());
List<Future<Map<String, Integer>>> futureList = new ArrayList<>();
for (File file : files) {
futureList.add(threadPoll.submit(new WordCounter(file)));
}
Map<String, Integer> result = new HashMap<>();
try {
for (Future<Map<String, Integer>> future : futureList) {
Map<String, Integer> stringIntegerMap = future.get();
stringIntegerMap.forEach((key, value) -> result.merge(key, value, Integer::sum));
}
} finally {
// 此处如果不 shutdown 那么 java 进程将不会退出
// 除了 shutdown,也可将线程池里面的线程都设置为 daemon 的
// https://github.com/bowen-wu/jdk8u-jdk/blob/master/src/share/classes/java/lang/Thread.java#L1332-L1355
threadPoll.shutdown();
}
System.out.println(result); // {a=15, b=12, c=9}
}
static class WordCounter implements Callable<Map<String, Integer>> {
private final File file;
public WordCounter(File file) {
this.file = file;
}
@Override
public Map<String, Integer> call() throws Exception {
Map<String, Integer> result = new HashMap<>();
List<String> lineList = Files.readAllLines(file.toPath());
for (String line : lineList) {
Thread.sleep(2000);
Arrays.stream(line.split("\\s+")).forEach(word -> result.merge(word, 1, Integer::sum));
}
/**
* Future 可能会抛出错误,错误会在 get 的时候拿到
*/
// if (file.getName().contains("2.txt")) {
// throw new RuntimeException("Future 可能会抛出错误,错误会在 get 的时候拿到");
// }
return result;
}
}
}
Fork/Join 框架
- Java 7 引入
- 分而治之策略的实例
-
ForkJoinPool
- public ForkJoinPool(int parallelism) => parallelism 指明在当前的 ForkJoinPool 中最大并发度,即可以同时运行的线程数量 => 默认值Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors())
- 使用 RecursiveTask 编写自定义的 Fork/Join 任务
- 工作窃取 => 使用双端队列(LinkedBlockingDeque)实现,偷任务从任务队列尾获取任务
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.txt -> a a a a a b b b b c c c
// 2.txt -> a a a a a b b b b c c c
// 3.txt -> a a a a a b b b b c c c
List<File> files = Arrays.asList(new File("1.txt"), new File("2.txt"), new File("3.txt"));
ForkJoinPool pool = new ForkJoinPool();
Map<String, Integer> result = pool.submit(new WordCounter(files)).get();
System.out.println(result); // {a=15, b=12, c=9}
}
static class WordCounter extends RecursiveTask<Map<String, Integer>> {
private final List<File> files;
public WordCounter(List<File> files) {
this.files = files;
}
@Override
protected Map<String, Integer> compute() {
if (files.isEmpty()) {
return Collections.emptyMap();
}
Map<String, Integer> result = new HashMap<>();
try {
// Fork
List<String> lineList = Files.readAllLines(files.get(0).toPath());
for (String line : lineList) {
Thread.sleep(200);
Arrays.stream(line.split("\\s+")).forEach(word -> result.merge(word, 1, Integer::sum));
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
// Fork
Map<String, Integer> countOfRestFiles = new WordCounter(files.subList(1, files.size())).compute();
// Join
countOfRestFiles.forEach((key, value) -> result.merge(key, value, Integer::sum));
return result;
}
}
}
Collection.parallelStream
public class ThreadCoordination {
public static void main(String[] args) {
// 1.txt -> a a a a a b b b b c c c
// 2.txt -> a a a a a b b b b c c c
// 3.txt -> a a a a a b b b b c c c
List<File> files = Arrays.asList(new File("1.txt"), new File("2.txt"), new File("3.txt"));
Map<String, Integer> result = files.parallelStream().reduce(new HashMap<>(), new WordCounter(), ThreadCoordination::merge);
System.out.println(result);
}
public static Map<String, Integer> merge(Map<String, Integer> stringIntegerMap, Map<String, Integer> stringIntegerMap2) {
Map<String, Integer> result = new HashMap<>();
stringIntegerMap.forEach((key, value) -> result.merge(key, value, Integer::sum));
stringIntegerMap2.forEach((key, value) -> result.merge(key, value, Integer::sum));
return result;
}
static class WordCounter implements BiFunction<Map<String, Integer>, File, Map<String, Integer>> {
@Override
public Map<String, Integer> apply(Map<String, Integer> accumulate, File file) {
Map<String, Integer> map = new HashMap<>();
try {
List<String> lineList = Files.readAllLines(file.toPath());
for (String line : lineList) {
Thread.sleep(1000);
Arrays.stream(line.split("\\s+")).forEach(word -> map.merge(word, 1, Integer::sum));
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
return map;
}
}
}
知识点
- 协程 => 解决了多线程调度较慢,占用资源多的问题,没有解决并发问题,如死锁或竞争条件。优点:
- 占用空间少 => 一套方法栈
- 更快 => 一直占用 CPU 时间片,无上下文切换。需要自己实现调度算法
- 让线程无限的在停着,什么也不做,不吃 CPU,不是死循环
new CountDownLatch(1).await();
new Semaphore(0).acquire();
-
BlockingQueue => 会阻塞的队列,最合适的场景是描述生产者和消费者
- 使用枚举创建单例模式 => Effective Java
public enum SingletonObject {
SINGLETON_INSTANCE;
}
-
mutex == lock => mutual(共享) + exclusive(排他)
- wait vs sleep
- wait => 不占有锁,等待通知,放弃资源 => Thread state WAITING
- sleep => 占有锁,不放弃资源 => Thread state BLOCK
网友评论