背景
在一些场景中,我们需要获取多份数据,而这些数据获取的先后顺序是无关的,我们只需要把数据收集齐,然后再对这些数据统一处理。
比如:执行两个任务 task1 和 task2,都执行完毕后,再把两个任务的结果相加。
例如执行 task1 需要 1s,执行 task2 需要 2s,顺序执行,那么总时间是 1s + 2s = 3s。而如果我们用异步的方式,task1 和 task2 同时执行,则能减少响应的时间。

但并行的方式则必须注意,需要等待所有任务执行完毕之后,才能计算最终的结果,因此这就涉及多线程之间的等待。
方案一:Future.get() 获取数据
先创建一个线程池,并使用 ExecutorService.submit()
方法提交两个 Callable
任务。
提交任务后,这俩任务将开始异步并行执行,并返回了 Future
类型的对象,代表一个未来能获取结果的对象。
当我们调用 Future
对象的 get()
方法时,如果提交的任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么 get()
会阻塞当前线程,直到所有任务都完成后,才能获得执行的结果。
注意:当 submit()
时,任务就已经开始执行,但不会阻塞线程,当 get()
时,如果还有未完成的任务,才会阻塞当前主线程的运行,直到所有任务都完成后,才能获得执行的结果。
void executorServiceExample() throws Exception {
long time1 = System.currentTimeMillis();
// 任务1
Callable<Integer> task1 = () -> {
Thread.sleep(1000);
return 10;
};
// 任务2
Callable<Integer> task2 = () -> {
Thread.sleep(2000);
return 20;
};
// 创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务2个任务
Future<Integer> submit1 = service.submit(task1);
Future<Integer> submit2 = service.submit(task2);
// 获取任务结果,由于任务还未执行完毕,因此下一行将阻塞当前线程,直到所有任务都执行完毕
Integer result1 = submit1.get();
Integer result2 = submit2.get();
System.out.println("总耗时:" + (System.currentTimeMillis() - time1)); // 总耗时:2012
System.out.println(result1 + result2); // 30
service.shutdown();
}
方案二:CountdownLatch
CountdownLatch 用来控制一个或者多个线程等待多个线程。
维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

void countdownLatchExample() throws InterruptedException {
long time = System.currentTimeMillis();
// 创建 CountDownLatch,计数为2
CountDownLatch countDownLatch = new CountDownLatch(2);
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool(2);
// 任务1
Runnable task1 = () -> {
System.out.println("begin task 1");
Thread.sleep(1000); // 省略 try-catch
System.out.println("end task 1");
countDownLatch.countDown(); // 计数减1
};
// 任务2
Runnable task2 = () -> {
System.out.println("begin task 2");
Thread.sleep(2000); // 省略 try-catch
System.out.println("end task 2");
countDownLatch.countDown(); // 计数减1
};
// 并行执行任务
executorService.execute(task1);
executorService.execute(task2);
// 等待2个任务执行完毕,countDownLatch计数器为0,才往下执行
countDownLatch.await();
System.out.println("total time: " + (System.currentTimeMillis() - time));
executorService.shutdown();
}
方案三:CyclicBarrier
CyclicBarrier 用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是:
1、CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。
2、countDownLatch.countDown() 只会将计数减 1,不会阻塞当前线程,countDownLatch.await() 只会阻塞当前线程,不会减少计数;而cyclicBarrier.await() 既会阻塞当前线程,也会计数减1。
CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 当计数为0时回调该方法。
void cyclicBarrierExample() {
long time = System.currentTimeMillis();
// 创建CyclicBarrier,计数为2,当计数为0时会回调该方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("all task end");
System.out.println("total time: " + (System.currentTimeMillis() - time));
});
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 任务1
Runnable task1 = () -> {
System.out.println("begin task 1");
Thread.sleep(1000); // 省略 try-catch
System.out.println("end task 1");
cyclicBarrier.await(); // 计数减1并阻塞当前线程,省略了try-catch
};
// 任务2
Runnable task2 = () -> {
System.out.println("begin task 2");
Thread.sleep(2000); // 省略 try-catch
System.out.println("end task 2");
cyclicBarrier.await(); // 计数减1并阻塞当前线程,省略了try-catch
};
executorService.execute(task1);
executorService.execute(task2);
executorService.shutdown();
}
网友评论