线程池ThreadPool
任务队列+多线程
生产者-消费者模式
image.png
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
// 提交 Runnable 任务
Future<?>
submit(Runnable task);
// 提交 Callable 任务
<T> Future<T>
submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
2、CompletionService,线程池+结果队列
image.png示例1:
同时发三个请求,收到一个即可
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用于保存 Future 对象
List<Future<Integer>> futures =
new ArrayList<>(3);
// 提交异步任务,并保存 future 到 futures
futures.add(
cs.submit(()->geocoderByS1()));
futures.add(
cs.submit(()->geocoderByS2()));
futures.add(
cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
示例2:
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
// 并计算最低报价
AtomicReference<Integer> m =
new AtomicReference<>(Integer.MAX_VALUE);
for (int i=0; i<3; i++) {
executor.execute(()->{
Integer r = null;
try {
r = cs.take().get();
} catch (Exception e) {}
save(r);
m.set(Integer.min(m.get(), r));
});
}
return m;
问题:组合操作,竟态条件
Integer o;
do{
o= m. get();
if(o<=r){ break;}
}
while(! m. compareAndSet( o, r));
//或这么改:
for (int i=0; i<3; i++) {
Integer r = logIfError(()->cs.take().get());
executor.execute(()-> save(r));
m.getAndUpdate(v->Integer.min(v, r));
}
return m.get();
异步编程(CompletableFuture)
常见任务关系:串行、并行、and汇聚、or汇聚
image.png
异步编程可以用于上述模型,并且代码语义清晰,更专注业务逻辑
示例:
image.png
Future实现版本:
image.png
// T1Task 需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1 任务需要 T2 任务的 FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1: 洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1: 烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取 T2 线程的茶叶
String tf = ft2.get();
System.out.println("T1: 拿到茶叶:"+tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
}
}
// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2: 洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2: 洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2: 拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return " 龙井 ";
}
}
CompletableFuture版本:
image.png
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(()->{
System.out.println("T1: 洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1: 烧开水...");
sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
System.out.println("T2: 洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2: 洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2: 拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf)->{
System.out.println("T1: 拿到茶叶:" + tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
});
// 等待任务 3 执行结果
System.out.println(f3.join());
网友评论