美文网首页Java之家多线程
Java多线程之批量发起请求

Java多线程之批量发起请求

作者: 小小土豆dev | 来源:发表于2023-11-29 00:25 被阅读0次

    日常开发中可能会遇到批量发起请求的场景,如:从某个服务器拉取大批量数据,如果一次拉取,数据量太大。采用分页拉取方式,一页一页拉取,比较耗时,此时我们可以批量的同时去拉数据。

    如何发起批量请求呢?可以使用Java多线程去实现。每个线程拉取数据的执行时间可能不一致,我们希望先执行完任务的线程可以优先返回数据,不是等待所有线程都执行完,才返回数据。

    Future方式

    public static void sleep(long time) {
        try {
            TimeUnit.SECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void test() {
        long start = System.currentTimeMillis();
    
        //定义ExecutorService
        ExecutorService executor = Executors.newFixedThreadPool(3);
    
        //定义批量任务,每个任务的耗时不等
        final List<Callable<Integer>> tasks = Arrays.asList(
                () -> {
                    sleep(30L);
                    System.out.println("Task 30 completed done.");
                    return 30;
                },
                () -> {
                    sleep(10L);
                    System.out.println("Task 10 completed done.");
                    return 10;
                },
                () -> {
                    sleep(20L);
                    System.out.println("Task 20 completed done.");
                    return 20;
                }
        );
    
        //批量提交执行异步任务,
        try {
            List<Future<Integer>> futures = executor.invokeAll(tasks);
            futures.forEach(future -> {
                try {
                    System.out.println("返回结果: " + future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executor.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        long end = System.currentTimeMillis();
        long seed = end - start;
        System.out.format("seed=%s", seed);
    }
    

    执行结果:

    Task 30 completed done.
    Task 10 completed done.
    Task 20 completed done.
    返回结果: 30
    返回结果: 10
    返回结果: 20
    seed=30068
    

    我们无法优先获取最先执行完任务线程的结果,而是等到耗时(30s)最长的任务执行完毕后才可以拿到结果。

    CompletableFuture方式

    public static void test2() {
        long start = System.currentTimeMillis();
    
        List<String> list = Arrays.asList("1", "2", "3");
        List<CompletableFuture<String>> futures = list.stream()
                .map(item ->
                        CompletableFuture.supplyAsync(() -> {
                                    if ("1".equals(item)) {
                                        sleep(10L);
                                    } else if ("2".equals(item)) {
                                        sleep(30L);
                                    } else {
                                        sleep(20L);
                                    }
    
                                    System.out.println("thread name: " + Thread.currentThread().getName() + " task: " + item);
                                    return "任务" + item;
                                }
                        )).collect(Collectors.toList());
    
        futures.forEach(future -> {
            future.thenAccept(result -> {
                System.out.println("返回结果: " + result);
            });
        });
    
        futures.forEach(future -> {
            future.join();
        });
    
        long end = System.currentTimeMillis();
        long seed = end - start;
        System.out.format("seed=%s", seed);
    }
    

    执行结果:

    thread name: ForkJoinPool.commonPool-worker-9 task: 1
    返回结果: 任务1
    thread name: ForkJoinPool.commonPool-worker-11 task: 3
    返回结果: 任务3
    thread name: ForkJoinPool.commonPool-worker-2 task: 2
    返回结果: 任务2
    seed=30079
    

    可以优先拿到最先执行完任务线程的执行结果。

    CompletionService方式

    public static void test3() {
        long start = System.currentTimeMillis();
        //定义ExecutorService
        ExecutorService executor = Executors.newFixedThreadPool(3);
    
        //定义批量任务,每个任务的耗时不等
        final List<Callable<Integer>> tasks = Arrays.asList(
                () -> {
                    sleep(30L);
                    System.out.println("Task 30 completed done.");
                    return 30;
                },
                () -> {
                    sleep(10L);
                    System.out.println("Task 10 completed done.");
                    return 10;
                },
                () -> {
                    sleep(20L);
                    System.out.println("Task 20 completed done.");
                    return 20;
                }
        );
    
        //批量提交执行异步任务,
        try {
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
            tasks.forEach(completionService::submit);
    
            for (int i = 0; i < tasks.size(); i++) {
                try {
                    System.out.println("返回结果: " + completionService.take().get());
    
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    
        long end = System.currentTimeMillis();
        long seed = end - start;
        System.out.format("seed=%s", seed);
    }
    

    执行结果:

    Task 10 completed done.
    返回结果: 10
    Task 20 completed done.
    Task 30 completed done.
    返回结果: 20
    返回结果: 30
    seed=30064
    

    可以优先拿到最先执行完任务线程的执行结果。

    推荐

    https://juejin.cn/post/6844904195162636295
    https://www.jianshu.com/p/2528550e94a9

    相关文章

      网友评论

        本文标题:Java多线程之批量发起请求

        本文链接:https://www.haomeiwen.com/subject/vphvfltx.html