十三、多线程执行器
FutureTask 和 Callable
可以用来启动一个需要很长时间的计算任务
// Callable 与 Runnable 类似,但是有返回值
Callable<Integer> callable=()->{
int sum=0;
for (int i = 0; i < 1000000000; i++) {
sum+=i;
}
return sum;
};
// FutureTask 可以将 Callable 包装成实现了 Runnable 和 Future 接口的对象
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 开启线程运行任务
Thread thread=new Thread(futureTask);
thread.start();
// 获取任务的执行结果,未计算完会阻塞
// 如果计算线程被中断,抛出InterruptedException
futureTask.get();
//
try {
// 设置最长阻塞时间,超时还没得出结果会抛出异常
futureTask.get(10L, TimeUnit.SECONDS);
} catch (TimeoutException e) {
e.printStackTrace();
}
// 如果计算还没开始,取消该任务。如果计算处于运行中,且参数为 true ,则计算会被中断
futureTask.cancel(true);
futureTask.isDone();
futureTask.isCancelled();
System.out.println(futureTask.get());
线程池
线程创建是有消耗的,可以提前创建一些线程,需要的时候直接使用。维护这些线程的池子,也就是线程池
执行器
通过 Executors 的工厂方法获得线程池的执行器 ExecutorService
用 ExecutorService 来执行线程并得到一个 Future
用 Future 监视线程执行状况
// 执行器
ExecutorService executorService = Executors.newCachedThreadPool();
// 必要时创建新线程,空闲线程会被保留 60秒
Executors.newCachedThreadPool();
// 包含固定数量的线程,空闲线程会一直被保留,参数为线程数
Executors.newFixedThreadPool(32);
// 只有一个线程的“池”,会按顺序执行提交的任务
Executors.newSingleThreadExecutor();
// 用于预定执行一些任务而构建的固定线程池,支持定时及周期性的执行任务,参数为线程数
Executors.newScheduledThreadPool(32);
// ScheduledThreadPool 的单线程“池”版
Executors.newSingleThreadScheduledExecutor();
Runnable runnable=()->{};
Callable callable=()->1;
Future future;
// 提交一个 Runnable 或者 Callable 对象给 Executor 并返回一个 future 来监控执行
// 调用 future.get() 会返回 null ,runnable 没返回值
future=executorService.submit(runnable);
// 调用 future.get() 会在任务完成后,把第二个参数传入的对象返回
future=executorService.submit(runnable,new Object());
future=executorService.submit(callable);
// 当不在需要提交任何任务的时候调用,关闭执行器,不再接受新任务,当所有任务完成后,线程池的线程死亡
executorService.shutdown();
// 会取消尚未开始的任务,并且试图中断正在运行的线程
executorService.shutdownNow();
预定和周期性的执行任务
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12);
Runnable runnable=()->{};
Callable callable=()->1;
// 在指定延迟后执行任务 , 在 10s 后执行任务
executorService.schedule(runnable,10, TimeUnit.SECONDS);
executorService.schedule(callable,10, TimeUnit.SECONDS);
// 在指定延迟后周期性的执行任务, 10s 后开始,每天都在这个时间执行任务
executorService.scheduleAtFixedRate(runnable,10,60*60*24,TimeUnit.SECONDS);
// 在指定延迟后,固定间隔的执行任务, 10s 后开始,执行完任务隔 1 小时后,再次执行任务
executorService.scheduleWithFixedDelay(runnable,10,60*60*1, TimeUnit.SECONDS);
任务组
可以控制一组任务的执行
invokeAny : 任一任务得出结果,即返回结果
ExecutorCompletionService:可以先取出有结果的任务
ExecutorService executorService = Executors.newFixedThreadPool(20);
Callable<Integer> callable1=()->1;
Callable<Integer> callable2=()->2;
//Callable<Integer> callable2=()->{while (true);};
List<Callable<Integer>> callables=new ArrayList();
callables.add(callable1);
callables.add(callable2);
// 执行集合中的所有任务,且任意一个执行完成得出结果,即将结果返回,但是其他未执行完的线程仍会继续执行
// 可以用在比如说多线程分段查找,任何一个线程找到了就可以返回结果了
Integer result = executorService.invokeAny(callables);
// 可以设置超时时间,超时未得出结果会抛出 TimeOutException
executorService.invokeAny(callables,1,TimeUnit.SECONDS);
// 2,1,1,2,2,1,1,1,1,1, 且每次程序运行结果不尽相同
for (int i = 0; i < 10; i++) {System.out.print(executorService.invokeAny(callables)+",");}
// 执行集合中所有任务,并返回 Future 的集合 ,此方法会等所有任务都有结果了才返回
List<Future<Integer>> futures = executorService.invokeAll(callables);
// 与 invokeAny 一样也有带超时参数的版本
executorService.invokeAll(callables,1,TimeUnit.SECONDS);
// 构建一个能收集完成服务执行器,内部会维护一个阻塞队列 BlockingQueue,包含已完成执行的任务的future
ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
for (Callable<Integer> callable : callables) {
executorCompletionService.submit(callable);
}
System.out.println("******");
// take 方法取出一个已经执行完的任务的 future ,如果没有完成的任务,则会阻塞(底层的 BlockingQueue.take() 阻塞了)
executorCompletionService.take().get();
executorCompletionService.take().get();
// 只有两个任务结果已经全部取出,要是再取就阻塞了
//executorCompletionService.take().get();
// 取出一个完成的任务结果,如果没有 返回 null
executorCompletionService.poll();
// 没有结果的时候会等待给定时间 (就是BlockingQueue.poll)
executorCompletionService.poll(2,TimeUnit.SECONDS);
executorService.shutdown();
Fork-Join 框架
Jave SE 7 提供了 fork-join 框架,可以将任务分段并行执行
一个计算1-1000(不含1000)的和 ,用法如下,
public class MyForkJoin {
public static void main(String[] args) {
int start=1;
int end=1000;
// 创建线程池
ForkJoinPool forkJoinPool=new ForkJoinPool();
// 创建任务,
MyTask myTask = new MyTask(start, end);
// 执行任务
forkJoinPool.invoke(myTask);
// 获得任务执行结果
System.out.println(myTask.join());
// 一般算法对比看计算结果是否准确
System.out.println(sum(start,end));
}
static int sum(int start,int end){
int sum=0;
for (int i = start; i < end; i++) {
sum+=i;
}
return sum;
}
}
// 如果需要返回计算结果 则继承 RecursiveTask<T>
// 如果不需要生成结果,则继承 RecursiveAction
class MyTask extends RecursiveTask<Integer> {
// 设置分组每组长度的界限
static int threshold = 10;
// 分组起始
int start;
// 分组结束
int end;
public MyTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// 小于每组长度就不分了 直接执行计算
if (end - start <= threshold) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += i;
}
return sum;
} else {
// 继续分组
int mid = (start + end) / 2;
MyTask first = new MyTask(start, mid);
MyTask second = new MyTask(mid, end);
// 执行两个子任务 , 这明明是个任务类,那么线程池是从哪里来的呢, ForkJoinWorkerThread 内有一个 pool ,通过 Thread.currentThread() 拿到线程再拿到 pool
invokeAll(first, second);
// 因为这个任务的逻辑是求总和,将两个子任务求和,作为本任务的结果返回
return first.join() + second.join();
}
}
}
CompletableFuture
可以在任务完成后,按顺序执行一些工作的 Future
并没有明白具体怎么用,见 执行器-可完成的Future
// public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
CompletableFuture completableFuture = new CompletableFuture();
// 每个方法都返回 CompletableFuture ,在 Future 完成后执行后面的操作,且这些方法是非阻塞的
completableFuture.thenApply((t)->1);
completableFuture.thenCompose((t)->1);
// completableFuture.thenXxxxx 还有很多方法
// 可以实现一个任务完成后按顺序执行一些工作
completableFuture.thenApply((t)->1).thenCompose((t)->1).thenAccept((t)->{});
网友评论