美文网首页
CompletableFuture 异步编程基础

CompletableFuture 异步编程基础

作者: Tinyspot | 来源:发表于2024-04-15 10:24 被阅读0次

    1. CompletableFuture 类

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    }
    

    1.1 CompletableFuture 工厂方法

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    允许指定自定义的 Executor 来执行异步任务

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    allOf() 主要用于并行执行多个异步任务,并等待所有任务都完成

    1.2 CompletionStage 接口

    public interface CompletionStage<T> {
        // 异步回调
        public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
        public CompletionStage<Void> thenAccept(Consumer<? super T> action);
        public CompletionStage<Void> thenRun(Runnable action);
        public <U> CompletionStage<U> thenCompose
            (Function<? super T, ? extends CompletionStage<U>> fn);
    
        // 组合
        public <U,V> CompletionStage<V> thenCombine
            (CompletionStage<? extends U> other,
             BiFunction<? super T,? super U,? extends V> fn);
    }
    

    2. 异步计算任务 supplyAsync()

    2.1 supplyAsync() 方法

    @Test
    public void demo() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });
    
        try {
            // 阻塞,等待 future 完成
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    private final ExecutorService executor = new ThreadPoolExecutor(5, 20, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10));
    
    @Test
    public void joinDemo() {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> System.currentTimeMillis(), executor);
        Long time = future.join();
    }
    

    2.2 示例

    @Test
    public void demo() throws Exception {
        ArrayList<String> strings = Lists.newArrayList("111", "222");
    
        List<CompletableFuture<String>> futures = strings.stream()
                .map(str -> CompletableFuture.supplyAsync(() -> str))
                .collect(Collectors.toList());
    
        List<String> collect = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        log.info("result:" + collect);
    }
    

    2.3 回调函数 thenAccept()

    @Test
    public void demo() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });
    
        // 注册回调函数,当异步任务完成时打印结果
        future.thenAccept(result -> {
            System.out.println(result);
        });
    }
    

    thenAccept() 注册一个回调函数,当 CompletableFuture 完成时,该函数将接收到结果字符串并将其打印出来

    2.4 模拟耗时操作

    @Test
    public void thenAccept() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("supplyAsync: Hello");
            return "Hello";
        });
    
        log.info("start");
        // 回调函数
        future.thenAccept(str -> log.info(str));
        log.info("end");
    
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    

    3. 异步计算任务 runAsync()

    @Test
    public void runAsync() {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            log.info("runAsync start...");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("runAsync end...");
        });
    
        log.info("callback1...");
        // 回调函数
        CompletableFuture<Void> callbackFuture = future.thenRun(() -> {
            log.info("thenRun start...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("thenRun end...");
        });
        log.info("callback2...");
    
        // future.join();
        callbackFuture.join();
    }
    

    打印结果:

    09:34:34.881 [main] INFO com.juc.pool.Demo - callback1...
    09:34:34.881 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync start...
    09:34:34.885 [main] INFO com.pool.Demo - callback2...
    09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync end...
    09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun start...
    09:34:39.890 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun end...
    

    4. allOf()

    4.1 等待

    @Test
    public void demo() {
    
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("future1 start");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("future1 end");
            return "future1";
        });
    
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("future2 start");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("future2 end");
            return "future2";
        });
    
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
        CompletableFuture<Void> thenFuture = allFuture.thenAccept(unused -> {
            String join1 = future1.join();
            String join2 = future2.join();
            System.out.println(join1 + ", " + join2);
        });
    
        log.info("end....");
        
        // 阻塞,等待回调执行完成
        // thenFuture.join();
        try {
            thenFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    

    4.1 若改为直接 join()

    CompletableFuture.allOf(future1, future2).join();
    String join1 = future1.join();
    String join2 = future2.join();
    System.out.println(join1 + ", " + join2);
    

    思考:
    CompletableFuture.allOf().join()CompletableFuture.allOf().thenAccept().join() 区别与联系

    阻塞时机:
    直接 join() 立即阻塞,直到所有任务完成;
    而 thenAccept().join() 先注册回调,异步执行回调,最后阻塞等待回调执行完成

    结果处理:
    直接 join() 不涉及结果处理;
    thenAccept().join() 在所有任务完成后,执行特定的回调逻辑来处理结果

    5. join() vs get()

    相同点:
    两者都是为了获取由CompletableFuture封装的异步操作完成后产生的最终结果

    区别:
    异常处理,get() 抛出一个受检异常(checked exception 必须处理), join() 抛出未检查异常(unchecked exception)

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
    
    public T join() {
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
    }
    

    6. 计算

    @Test
    public void calculateDemo() {
        List<Integer> values = new ArrayList<>();
        for (int i = 1; i < 1000001; i += 5000) {
            values.add(i);
        }
    
        List<CompletableFuture<List<Integer>>> futures = new ArrayList<>();
        for (Integer value : values) {
            futures.add(CompletableFuture.supplyAsync(() -> getList(value)));
        }
    
        // List<List<Integer>> collect = futures.stream().map(CompletableFuture::join)
        //         .collect(Collectors.toList());
    
        List<Integer> list = futures.stream().map(CompletableFuture::join)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
    
        System.out.println("collect.size: " + list.size());
    }
    
    public static List<Integer> getList(Integer value) {
        List<Integer> values = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            values.add(i + value);
        }
        return values;
    }
    

    相关文章

      网友评论

          本文标题:CompletableFuture 异步编程基础

          本文链接:https://www.haomeiwen.com/subject/cprdxjtx.html