目录
目录.png概念
- 并行是指两个或者多个事件在同一时刻发生(cpu多核);而并发是指两个或多个事件在同一时间间隔内发生
饥饿,死锁, 活锁
- 饥饿: 优先级低的线程得到执行的机会很小,就可能发生线程饥饿
- 饥饿预防: 要避免使用线程优先级,因为这会增加平台依赖性,并可能导致活跃性(某件正确的事情最终会发生,主要问题包括死锁、饥饿、以及活锁)问题。在大多数并发应用程序中,都可以使用默认的线程优先级
- 活锁: 过度的错误处理代码导致无限循环处理错误,当多线程中出现了相互谦让,都主动将资源释放给别的线程使用,有可能自动解决。
- 活锁预防: 比如重试机制中引入随机性(不同时间)。 例如,在网络上,如果有两台机器尝试使用相同的载波来发送数据包,那么这些数据包就会发生冲突。这两台机器都检查到了冲突,并都在稍后再次发送。 如果二者都选择了在0.1秒后重试,那么会再次冲突,并且不断冲突下去,这时候需要改变重试时间。
- 死锁: 多个线程相互占用对方的资源的锁,而又相互等对方释放锁。
- 死锁条件:
(1) 互斥条件:一个资源每次只能被一个进程使用。
(2) 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
(3) 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
(4) 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系
ForkJoinPool
- ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
- ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
// newWorkStealingPool线程池的实现用到了ForkJoinPool,用到了分而治之,递归计算的算法, 抢占式
ExecutorService exec = Executors.newWorkStealingPool();
- ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况, 不太适合
- parallel()并行流并不一定有想象中那么美好,有时候并行流可能增加时间,导致时间增加的一个重要原因是处理器内存缓存限制, parallel底层就是用到了ForkJoinPool
- ForkJoinPool示例
public class TestForkJoinCalculator {
private final ForkJoinPool pool;
//执行任务RecursiveTask:有返回值 RecursiveAction:无返回值
private static class SumTask extends RecursiveTask<Long> {
private final long[] numbers;
private final int from;
private final int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法为ForkJoin的核心方法:对任务进行拆分 拆分的好坏决定了效率的高低
@Override
protected Long compute() {
// 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else { // 否则,把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public TestForkJoinCalculator() {
// 也可以使用公用的线程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
TestForkJoinCalculator calculator = new TestForkJoinCalculator();
long result = calculator.sumUp(numbers);
stopWatch.stop();
System.out.println("耗时:" + stopWatch.getTotalTimeMillis() + "ms");
System.out.println("结果为:" + result);
}
}
-
原理:
参考文章4的图
原理图.png
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成
- 线程默认: 如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1. 或者设置启动参数:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
- 当使用ThreadPoolExecutor时,使用分治法(分治时任务会很多)会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。ThreadPoolExecutor中的Thread无法选择优先执行子任务
基础
while true
- while true运行的线程除了中断,任务终止的最佳方法是设置任务周期性检查的标志
然后任务可以通过自己的 shutdown 进程并正常终止。不是在任务中随机关闭线程,而是要求任务在到达了一个较好时自行终止。
线程等待其他线程
join,countDownLatch, CyclicBarrier
CompletableFuture
-
CompletableFuture是受guava的的listenable future启发写的, 比Future更多功能,但是项目中实现多线程调用接口,并合并结果是用的是封装的RxJava2框架。RxJava2更能更多,比如CompletableFuture不支持lazy,RxJava2支持。复用了参考文章3的图
对比
构造方法非线程安全
- 构造函数和普通函数一样,并不是默认被synchronized 的,有可能出现同步问题, 比如有静态变量在构造函数处理,就有可能出现问题。
- 构造方法不需要同步化,因为它只可能发生在一个线程里,在构造方法返回值前没有其他线程可以使用该对象。
- this逃逸问题: this逃逸是指在构造函数返回之前其他线程就持有该对象的引用。this逃逸经常发生在构造函数中启动线程或注册监听器时
public class ThisEscape {
public ThisEscape() {
new Thread(new EscapeRunnable()).start();
// ...其他代码
}
private class EscapeRunnable implements Runnable {
@Override
public void run() {
// 在这里通过ThisEscape.this就可以引用外围类对象, 但是此时外围类对象可能还没有构造完成, 即发生了外围类的this引用的逃逸
}
}
}
改造:
public class ThisEscape {
private Thread t;
public ThisEscape() {
t = new Thread(new EscapeRunnable());
// ...其他代码
}
public void init() {
t.start();
}
private class EscapeRunnable implements Runnable {
@Override
public void run() {
// 在这里通过ThisEscape.this就可以引用外围类对象, 此时可以保证外围类对象已经构造完成
}
}
}
并发编程缺点
- 在线程等待共享资源时会降低速度。
- 线程管理产生额外 CPU 开销。
- 糟糕的设计决策带来无法弥补的复杂性。
创建一个 Thread, jvm做的事情
- 程序计数器,指明要执行的下一个 JVM 字节码指令。
- 用于支持 Java 代码执行的栈
- 第二个则用于 native code(本机方法代码)执行的栈
- thread-local variables (线程本地变量)的存储区域
- 用于控制线程的状态管理变量
超线程
- 指一种硬件技巧,能在单个处理器上产生非常快速的上下文切换,在某些情况下可以使内核看起来像运行两个硬件线(Java线程之间切换上下文是有代价)
线程数量
- 可以有几千个, 但是要考虑上下文切换代价
wait, notify
- wait方法需要释放锁,前提条件是它已经持有锁。所以wait和notify(或者notifyAll)方法都必须被包裹在synchronized语句块中,并且synchronized后锁的对象应该与调用wait方法的对象一样。否则抛出IllegalMonitorStateException。wait是在当前线程持有wait对象锁的情况下,暂时放弃锁,并让出CPU资源,并积极等待其它线程调用同一对象的notify或者notifyAll方法。
- 并发工具优先于wait和notify
volatile
- 如果你正在尝试调试其他人的并发代码,请首先查找使用 volatile 的代码并将其替换为Atomic 变量,成本低很多(java8之后)。
字分裂
- (在 Java 中 long 和 double 类型都是 64 位),写入变量的过程分两步进行,就会发生 Word tearing (字分裂)情况。 JVM 被允许将64位数量的读写作为两个单独的32位操作执行用 volatile 修饰符定义一个 long 或 double 变量,可阻止字分裂情况
volatile可见性
- 当一个任务更改标志值时,这些更改可以存储在本地处理器缓存中,而不会刷新到主内存
使用 AtomicBoolean 类型作为标志值的办法替代volatile也可以。发送一条Lock前缀的指令会强制将对缓存的修改操作立即写入主内存。 - 如果是写操作,它会导致其他CPU中对应的缓存行无效。为了保证各个处理器缓存一致,每个处理会通过嗅探在总线上传播的数据来检查 自己的缓存是否过期,当处理器发现自己缓存行对应的内存地址被修改了,就会将当前处理器的缓存行设置成无效状态。
volatile 重排与 Happen-Before 原则
happens before 担保原则( volatile (易变性)操作通常称为 memory barrier (通过Lock前缀指令生内存屏障)
one,two,three 变量赋值操作就可以被重排, xyz也是
happens before 担保原则确保 volatile 变量的读写指令不能跨过内存屏障进行重排
happens before 担保原则还有另一个作用:当线程向一个 volatile 变量写入时,在线程写入之前的其他所有变量(包括非 volatile 变量)也会刷新到主内存。当线程读取一个 volatile 变量时,它也会读取其他所有变量(包括非 volatile 变量)与 volatile 变量一起刷新到主内存
public void run() {
one = 1;
two = 2;
three = 3;
volaTile = 92;
int x = four;
int y = five;
int z = six;
}
happens-before原则
- JMM规定了JVM必须遵循一组最小保证
- 在JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须存在happens-before关系。HB 原则是对单线程环境下的指令重排序以及多线程环境下的线程间数据的一致性进行的约束。
- happen-before原则是JMM中非常重要的原则,它是判断数据是否存在竞争、线程是否安全的主要依据,保证了多线程环境下的可见性。
- 单线程happen-before原则:在同一个线程中,前面的操作产生的结果必须对后面的操作可见,书写在前面的操作happen-before后面的操作(必须有数据依赖,无数据依赖则有可能指令重排)。
- 锁的happen-before原则:同一个锁的unlock操作happen-before此锁的lock操作
- volatile的happen-before原则: 对一个volatile变量的写操作happen-before对此变量的任意操作。
- happen-before的传递性原则: 如果A操作 happen-before B操作,B操作happen-before C操作,那么A操作happen-before C操作。
- 线程启动的happen-before原则:同一个线程的start方法happen-before此线程的其它方法
- 线程中断的happen-before原则:对线程interrupt方法的调用happen-before被中断线程的检测到中断发送的代码(interrupt 方法改变的状态必须对后续执行的检测方法可见)
- 线程终结的happen-before原则:线程中的所有操作都happen-before线程的终止检测。
- 对象创建的happen-before原则:一个对象的初始化完成先于他的finalize方法调用。
原子性
- 同步机制强制多核处理器系统上的一个任务做出的修改必须在应用程序中是可见的。如果没有同步机制,那么修改时可见性将无法确认。原子性并不保证可见性。
- automic原子类: 快速、无锁的操作,它们是利用了现代处理器上可用的机器级原子性
- 使用显示锁ReentrantLock之类的: 如果使用 synchronized 关键字失败,就会抛出异常,但是你没有机会进行任何清理以保持系统处于良好状态。而使用显式锁对象,可以使用 finally 子句在系统中维护适当的状态。显式锁比起内置同步锁提供更细粒度的加锁和解锁控制。
- 无锁: cow cas, ConcurrentHashMap 不会抛出concurrentmodificationexception(并发安全的,迭代安全,但是迭代完整没办法保证)CopyOnWriteArrayList 的其中一个好处是,当多个迭代器遍历和修改列表时,它不会抛出 ConcurrentModificationException 异常。对集合迭代时 对原集合进行一份拷贝,对拷贝的新元素进行迭代,这叫安全失败。
- cas: 在 比较并交换 (CAS) 中,你从内存中获取一个值,并在计算新值时保留原始值。然后使用 CAS 指令,它将原始值与当前内存中的值进行比较,如果这两个值是相等的,则将内存中的旧值替换为计算新值的结果,所有操作都在一个原子操作中完成。许多现代处理器的汇编语言中都有一条 CAS 指令,并且也被 JVM 中的 CAS 操作(例如 Atomic 类中的操作)所使用。CAS 指令在硬件层面中是原子性的,并且与你所期望的操作一样快。
java并发中常见的锁
- 偏向锁,轻量级锁,重量级锁
- 偏向锁: 其核心的思想是,如果程序没有竞争,则取消之前已经取得锁的线程同步操作。也就是锁消除
- 轻量级锁: 其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能,自旋等待
- 重量级锁:synchronized这类的,追求吞吐量。同步块执行速度较长。
- 乐观锁,悲观锁
- 乐观锁,认为多线程竞争不激烈,竞争不激烈的情况下, 在数据库表中增加版本号,先查数据,然后根据之前的版本号更新这条数据,找得到更新,找不到不更,这就是乐观锁的实现。
- 悲观锁: 认为多线程竞争激烈。
- 公平锁,非公平锁
- sychronized为非公平锁,锁获取随机, 公平锁的锁获取是根据申请时间的,非公平锁处理的快。
- 可重入锁
- 可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
- 独享锁/共享锁
- 独享锁是指该锁一次只能被一个线程所持有;共享锁是指该锁可被多个线程所持有。
- 对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。
- 分段锁
- 分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操
多线程获取结果对比RxJava2使用
- 多线程版本(伪代码)
private Response asyncHandle(SearchRequest request){
Response response = new HResponse();
response.setLowRates(new ArrayList<>());
List<Future<Response>> futures = new ArrayList<>();
while (iteratorCity.hasNext()) {
futures.add(threadPool.submit(new SearchRunner(countryID, request));
}
while(iter.hasNext()) {
if(timeout(start)) {
System.ou.println("....");
break;
}
Future<Response> future = iter.next();
try {
Response resp = future.get(Config.getSingleTimeout(), TimeUnit.MILLISECONDS);
if(resp != null) {
response.getLowRates().addAll(resp.getLowRates());
iter.remove();
}
}
// 省略catch
}
return response;
}
- RxJava2版本
- 异步操作,中间执行的任务可以是异步网络操作,控制socket timeout之类的可以在这块处理。更优雅。
// 常见的示例,这是一个异步操作
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
// 这里被指定在IO线程
singleSubscriber.onSuccess(addValue(1, 2));
}
})
.subscribeOn(Schedulers.io())// 指定运行在IO线程
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Integer o) {
// o = 3
}
});
- zip
Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer o, Integer o2) {
LogHelper.e("A:" + o + "=" + o2);
return null;
}}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
LogHelper.e("kk:"+s);
}
});
网友评论