一、Fork/Join
Java7提供了Fork/Join用于并行执行任务的框架, 可以把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决,对开发来说也不再需要处理各种并行相关事务,例如同步、通信、死锁等问题,需要做的就是拆分任务并组合每个子任务的中间结果。
1.工作窃取
JDK1.7引入的Fork/Join框架就是基于工作窃取算法,是指某个线程从其他队列里窃取任务来执行。工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
一般会使用双端队列,比如AB线程分别处理AB两个任务队列,当有一个线程执行完一个任务队列时,会去窃取另一个未完成的队列任务,而被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
source: Fork/Join框架介绍
2.类关系和API
- RecursiveAction不需要返回值。
- RecursiveTask通过泛型参数设置计算的返回值类型。
- ForkJoinPool提供了一系列的submit方法,计算任务。ForkJoinPool默认的线程数通
Runtime.availableProcessors()
获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。
在RecursiveTask类中最重要的是实现compute()
接口,此处定义了计算和拆分方法。
fork() - 每个子任务在调用fork方法时,又会进入compute方法,如果还要拆分则继续递归调用。
join() - 使用join方法会等待子任务执行完并得到其结果。
invokeAll() - 可变参数,触发所有输入的Task来计算。
假设在计算一个求和算法时,就可以采用Fork/Join方法进行并行计算。
public class Calculator extends RecursiveTask<Integer> {
private static final int THRESHOLD = 100;
private int start;
private int end;
public Calculator(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if((start - end) < THRESHOLD){
for(int i = start; i< end;i++){
sum += i;
}
}else{
int middle = (start + end) /2;
Calculator left = new Calculator(start, middle);
Calculator right = new Calculator(middle + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
}
source:Java 7 Fork/Join 并行计算框架概览
二、CountDownLatch ( 重点! )
CountDownLatch,又名发令枪。这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
1.API及用法
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量,每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
- 设定初始值,即构造函数
//参数count为计数值
public CountDownLatch(int count) { };
- 主线程调用await等待
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException{};
- 任务线程执行完计数
//将count值减1
public void countDown() { };
2.使用场景
- 实现最大的并行性:有时想同时启动多个线程,实现最大程度的并行性。例如测试一个单例类,如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
- 开始执行前等待n个线程完成各自任务(应用最广):例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
- 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
三、CyclicBarrier
CyclicBarrier——回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
//参数parties指让多少个线程或者任务等待至barrier状态
public CyclicBarrier(int parties) {}
//参数barrierAction为当这些线程都达到barrier状态时会执行的内容
public CyclicBarrier(int parties, Runnable barrierAction) {}
CyclicBarrier中最重要的方法就是await方法,有重载方法但是一般采用:
//用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务
public int await() throws InterruptedException, BrokenBarrierException { };
处理同一个动作有多个线程同时执行的场景,用一个栅栏卡在同一个状态,只有当所有线程抵达该状态了,才可以全部放行。
public class TestCyclicBarrier {
public static void main(String[] args) {
//1.得到一个CyclicBarrier实例
CyclicBarrier cb = new CyclicBarrier(4);
new Thread(new Fishing(cb),"1").start();
new Thread(new Fishing(cb),"2").start();
new Thread(new Fishing(cb),"3").start();
new Thread(new Fishing(cb),"4").start();
}
static class Fishing implements Runnable{
CyclicBarrier cb;
public Fishing(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
cb.await();
System.out.println("第(" + Thread.currentThread().getName()
+ ")个人开始钓鱼");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
CountDownLatch和CyclicBarrier的区别:
- CountDownLatch强调的是一个线程(或多个)需要等待另外的n个线程干完某件事情之后才能继续执行。
- CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着。
- CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
四、Semaphore(推荐)
Semaphore——信号量,控制同时访问某个特定资源的线程数量,用在流量控制。
Semaphore可以控同时访问的线程个数,一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少。其包含两个构造方法:
//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
//获取一个许可
public void acquire() throws InterruptedException { }
//获取permits个许可
public void acquire(int permits) throws InterruptedException { }
//释放一个许可
public void release() { }
//释放permits个许可
public void release(int permits) { }
五、Callable、Future和FutureTask
java.lang.Runnable接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。如果想要返回结果,则可以采用Callable。
- Callable接口位于java.util.concurrent包下,里面只声明了一个方法
call()
:
public interface Callable<V> {
V call() throws Exception;
}
- 若需要执行Callable接口的方法,还需配合ExecutorService来调用
submit()
方法使之执行:
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
- Future,对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- FutureTask,对Future接口的具体实现(也是唯一实现),既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
public FutureTask(Callable<V> callable) { }
public FutureTask(Runnable runnable, V result) { }
实际使用的时候,可先定义Callble类、并重写call方法:
public class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
接着可以有两种方式去启动有返回值的线程方法:
- 结合ExecutorService开启多线程
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
//创建Callable对象任务
Task task = new Task();
//提交任务并获取执行结果
Future<Integer> result = executor.submit(task);
//关闭线程池,拿完线程就可以把池子关闭了,及时释放资源
executor.shutdown();
- 结合Thread开启多线程
//构造一个有返回值的并行任务
Task task = new Task();
//构造FutureTask
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
//构造一个新线程并注入FutureTask
Thread thread = new Thread(futureTask);
//开启线程
thread.start();
- 执行
get()
方法会阻塞到线程执行完得到结果
try {
if(futureTask.get()!=null){
System.out.println("task运行结果"+futureTask.get());
}else{
System.out.println("future.get()未获取到结果");
}
} catch (ExecutionException e) {
e.printStackTrace();
}
网友评论