public class Client {
public static void main(String[] args) throws Exception {
Client client = new Client();
client.concurrentTest();
}
public boolean concurrentTest() throws ExecutionException, InterruptedException {
List<String> allFileNames = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
allFileNames.add(String.valueOf(i));
}
// 循环处理单个文件
AtomicBoolean success = new AtomicBoolean(true);
ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 8, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(allFileNames.size()));
List<CompletableFuture<Boolean>> futureList = new ArrayList<>();
// 这里做成异步
for (String fileName : allFileNames) {
if (success.get()) {
CompletableFuture<Boolean> singleAsyncResult = CompletableFuture.supplyAsync(
new SingleSupplier(fileName, success), executorService);
futureList.add(singleAsyncResult);
System.out.println(fileName + "提交线程池");
} else {
System.out.println("直接不提交" + fileName);
}
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
CompletableFuture<List<Boolean>> allCompletableFuture = allFutures.thenApply(future -> futureList.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList()));
List<Boolean> booleans = allCompletableFuture.get();
System.out.println("获取所有异步结果=" + booleans);
System.out.println("原子标志位=" + success.get());
//等待所有的下载任务执行完毕
for (Boolean aBoolean : booleans) {
if (!aBoolean) {
return false;
}
}
return true;
}
}
@Slf4j
public class SingleSupplier implements Supplier<Boolean> {
private String fileName;
private AtomicBoolean success;
public SingleSupplier(String fileName, AtomicBoolean success) {
this.fileName = fileName;
this.success = success;
}
@Override
public Boolean get() {
boolean res;
try {
if (!this.success.get()) {
Thread.sleep(10);
log.error("其他任务失败,直接停止执行" + this.fileName);
return false;
}
if (fileName.contains("3")) {
res = true;
Thread.sleep(10);
System.out.println("完成:" + this.fileName);
} else if (fileName.contains("1")) {
Thread.sleep(10);
throw new Exception("4 exception");
} else {
Thread.sleep(10);
res = false;
}
} catch (Exception e) {
res = false;
e.printStackTrace();
}
if (!res) {
log.error("测试失败");
this.success.compareAndSet(true, false);
}
return res;
}
}
网友评论