美文网首页干货分享
Java分析——使用异步编排任务和线程池

Java分析——使用异步编排任务和线程池

作者: 小白菜aaa | 来源:发表于2020-09-20 16:04 被阅读0次

    | 一、如何创建线程池? |

    1、七大参数介绍

    | 1)corePoolSize |

    核心线程数,一直存在线程池中(除非设置了allowCoreThreadTimeOut),创建好就等待就绪,去执行任务

    | 2)maximumPoolSize |

    最大线程数,设置最大线程数是为了控制资源

    | 3)keepAliveTime |

    存活时间,如果当前的线程数大于核心线程数,并且线程空闲的时间大于存活时间了,则会执行释放线程的操作。(释放的数量为:maximumPoolSize - corePoolSize)

    | 4)unit |

    时间单位

    | 5)workQueue |

    阻塞队列,如果任务有很多,就会将目前多的任务放到队列中,当有空闲的线程时,就会从队列中取出新的任务继续执行。

    | 6)threadFactory |

    线程的创建工厂

    | 7)handler |

    拒绝策略,如果队列满了,按照我们指定的拒绝策略拒绝执行任务

    有哪些拒绝策略?

    1. DiscardOldestPolicy
      如果有新的任务进来就会丢去最旧的未执行的任务

    2. AbortPolicy
      直接丢弃新任务,抛出异常

    3. CallerRunsPolicy
      如果有新任务进来,直接调用run()方法,同步执行操作

    1. DiscardPolicy
      直接丢弃新进来的任务,不会抛出异常
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
    // 常见的创建线程的方式
    // 1)Executors . newCachedThreadApol() // 核心为0,所有都可回收的线程池
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    // 2)Ехесutоrѕ . nеwFіхеdТhrеаdРооl() 固定大小的线程池,不会过期
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    // 3)Executors . newScheduledThreadPool() 定时任务的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    // 4)Executors . newSingleThreadExecutor() 单线程的线程池,后台从队列中获取任务,一个一个执行
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
    }
    
    

    问:一个corePoolSize=7 maximumPoolSize=20 workQueue=50的线程池,如果本次有100个并发进来,是如何执行的?

    答:7个会立即被执行,50个会进入队列,然后会另外开13个新的线程,剩余的30个线程就需要看当前线程池的拒绝策略了。

    | 二、CompletableFeture异步编排 |

    1、runAsync 创建异步对象的方式

        // 1)无返回值的异步操作
        public static CompletableFuture<Void> runAsync(Runnable runnable) {
                return asyncRunStage(asyncPool, runnable);
        }
        // 2)无返回值的异步操作,可指定线程池
        public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                           Executor executor) {
                return asyncRunStage(screenExecutor(executor), runnable);
        }
        
        // 3)有返回值的异步操作
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
                return asyncSupplyStage(asyncPool, supplier);
        }
        
        // 4)有返回值的异步操作,可指定线程池
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                               Executor executor) {
                return asyncSupplyStage(screenExecutor(executor), supplier);
        }
    

    2、whenComplete 计算完成时回调的方法

    1)方法介绍

        // 上一个异步完成时执行该方法,和上一个任务用同一个线程
        public CompletableFuture<T> whenComplete(
            BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(null, action);
        }
        // 上一个异步完成时执行该方法,异步的方式执行
        public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action) {
            return uniWhenCompleteStage(asyncPool, action);
        }
        // 上一个异步完成时执行该方法,异步的方式执行,可以自己指定线程池
        public CompletableFuture<T> whenCompleteAsync(
            BiConsumer<? super T, ? super Throwable> action, Executor executor) {
            return uniWhenCompleteStage(screenExecutor(executor), action);
        }
    
        // 处理异常
        public CompletableFuture<T> exceptionally(
            Function<Throwable, ? extends T> fn) {
            return uniExceptionallyStage(fn);
        }
    
    

    2)示例代码

    // 示例代码线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程号 -> " + Thread.currentThread().getId());
        int n = 10 / 0;
        return n;
    }, executor).whenComplete((result,excption) -> {
        System.out.println("运行结果:" + result + "异常:" + excption);
    }).exceptionally(throwable -> { 
        // 出现异常 exceptionally感知并处理异常,返回最终结果
            return 10;
    });
    
    Integer integer = future.get();
    System.out.println("最终运行结果:" + integer);  // 10
    
    

    3、handleAsync 方法

    1)方法介绍

        // 上一个方法执行后作出的处理
        public <U> CompletableFuture<U> handle(
            BiFunction<? super T, Throwable, ? extends U> fn) {
            return uniHandleStage(null, fn);
        }
    
        public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn) {
            return uniHandleStage(asyncPool, fn);
        }
    
        public <U> CompletableFuture<U> handleAsync(
            BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
            return uniHandleStage(screenExecutor(executor), fn);
        }
    
    

    2)示例代码

    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程号 -> " + Thread.currentThread().getId());
        int n = 10 / 0;
        return n;
    }, executor).handle((res, exception) -> {
        if (res != null) {
            // 如果上一个任务没出现异常,修改返回结果
            return res * 10;
        }
        if (exception != null) {
            // 上一个任务出现了异常
            return 0;
        }
        return 0;
    });
    
    

    4、线程串行化方法

    1)方法介绍

    thenApply方法:当一个线程依赖另一个 线程时,狱取上一个任务返回的结果,开返回当前任务的返回值。

    thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

    thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作

    
        public <U> CompletableFuture<U> thenApply(
            Function<? super T,? extends U> fn) {
            return uniApplyStage(null, fn);
        }
    
        public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T,? extends U> fn) {
            return uniApplyStage(asyncPool, fn);
        }
    
        public <U> CompletableFuture<U> thenApplyAsync(
            Function<? super T,? extends U> fn, Executor executor) {
            return uniApplyStage(screenExecutor(executor), fn);
        }
    
        public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
            return uniAcceptStage(null, action);
        }
    
        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
            return uniAcceptStage(asyncPool, action);
        }
    
        public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                       Executor executor) {
            return uniAcceptStage(screenExecutor(executor), action);
        }
    
        public CompletableFuture<Void> thenRun(Runnable action) {
            return uniRunStage(null, action);
        }
    
        public CompletableFuture<Void> thenRunAsync(Runnable action) {
            return uniRunStage(asyncPool, action);
        }
    
        public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                    Executor executor) {
            return uniRunStage(screenExecutor(executor), action);
        }
    
    

    2)示例代码

    | ① thenRunAsync |

    thenRunAsync 不能获取上一步执行结果

        // thenRunAsync 不能获取上一步执行结果
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程号 -> " + Thread.currentThread().getId());
            int n = 10 / 0;
            return n;
        }, executor).thenRunAsync(() -> {
            System.out.println("线程2运行了!");
        }, executor);
    
    

    | ② thenAcceptAsync |

    thenAcceptAsync可以获取上一个任务执行的结果,但是无法对其进行修改

        // thenAcceptAsync可以获取上一个任务执行的结果,但是无法对其进行修改
       CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程号 -> " + Thread.currentThread().getId());
            int n = 10;
            return n;
        }, executor).thenAcceptAsync((res) -> {
            // 如果上一个任务产生异常或者执行失败,则不执行该任务
            System.out.println("上一个任务获取的结果:" + res);
        }, executor);
    
    

    | ③ thenApplyAsync |

    thenApplyAsync 可以获取上一个任务返回的结果,并对其进行修改再返回

     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程号 -> " + Thread.currentThread().getId());
                int n = 10;
                return n;
            }, executor).thenApplyAsync((res) -> {
                return res * 2;
            }, executor);
    
            Integer result = future.get();
    
            System.out.println("最终返回结果:" + result);
    
    

    5、组合任务,一个完成

    1)方法介绍

    applyToEitherAsync:阻塞等待,只要有一个任务完成了,就执行该任务

       public <U> CompletableFuture<U> applyToEitherAsync(
            CompletionStage<? extends T> other, Function<? super T, U> fn,
            Executor executor) {
            return orApplyStage(screenExecutor(executor), other, fn);
        }
    
    

    2)示例代码

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程号 -> " + Thread.currentThread().getId());
        int n = 5;
        // 模拟这个任务比较慢完成,让future2先完成,测试applyToEitherAsync 只要有一个任务完成就执行
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return n;
    }, executor);
    
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程号 -> " + Thread.currentThread().getId());
        int n = 10;
        return n;
    }, executor);
    
    future1.applyToEitherAsync(future2, res -> {
        System.out.println(res);
        return res + 1;
    }, executor);
    
    

    6、组合任务,所有的完成

    1)方法介绍

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
            return andTree(cfs, 0, cfs.length - 1);
    }
    
    

    2)示例代码

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1当前线程号 -> " + Thread.currentThread().getId());
        int n = 5;
        return n;
    }, executor);
    
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2当前线程号 -> " + Thread.currentThread().getId());
        int n = 10;
        return n;
    }, executor);
    
    CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
    // 阻塞等待所有的任务执行完成
    allOf.get();
    
    Integer result1 = future1.get();
    Integer result2 = future2.get();
    
    

    让我们来试试项目中如何使用异步编排吧!

    | 三、异步编排实际开发 |

    1、配置线程池

    @ConfigurationProperties(prefix = "coke.thread")
    @Component
    @Data
    public class ThreadPoolProperties {
        private Integer coreSize;
        private Integer maxSize;
        private Integer keepAliveTime;
    }
    
    
    //@EnableConfigurationProperties(ThreadPoolProperties.class)  如果没有把线程池的常量配置类放到容器中,则使用该注解
    @Configuration
    public class MyThreadConfig {
    
        @Bean
        public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties pool) {
            return new ThreadPoolExecutor(
                    pool.getCoreSize(),
                    pool.getMaxSize(),
                    pool.getKeepAliveTime(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(100000),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy()
            );
        }
    }
    
    

    3、示例代码

    public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
    
        SkuItemVo skuItemVo = new SkuItemVo();
    
        // supplyAsync 需要返回结果  因为 3 4 5 依赖1
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
            // 1、获取sku基本信息 pms_sku_info
            SkuInfoEntity skuInfoEntity = getById(skuId);
            skuItemVo.setInfo(skuInfoEntity);
            return skuInfoEntity;
        }, executor);
    
        CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
            // 3、获取spu的销售属性组合
            List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
            skuItemVo.setSaleAttrs(skuItemSaleAttrVos);
        }, executor);
    
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
            // 4、获取spu的介绍 pms_spu_info_desc
            SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
            skuItemVo.setDesp(spuInfoDescEntity);
        }, executor);
    
        CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
            // 5、获取spu的规格参数信息
            List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getCatalogId(), res.getSpuId());
            skuItemVo.setAttrGroups(attrGroupVos);
        }, executor);
    
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
            // 2、获取sku的图片信息 pms_spu_images
            List<SkuImagesEntity> skuImagesEntities = skuImagesService.getImageBySkuId(skuId);
            skuItemVo.setImages(skuImagesEntities);
        }, executor);
    
        // 6、查询当前sku是否参与秒杀优惠
        CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
            R skuSecKillInfo = secKillFeignService.getSkuSecKillInfo(skuId);
            if (skuSecKillInfo.getCode() == 0) {
                SecKillInfoVo skuSecKillInfoData = skuSecKillInfo.getData(new TypeReference<SecKillInfoVo>() {
                });
                skuItemVo.setSecKillInfoVo(skuSecKillInfoData);
            }
        }, executor);
    
        // 等到所有任务都完成
        CompletableFuture.allOf(saleFuture, descFuture, baseAttrFuture, imageFuture, secKillFuture).get();
    
        return skuItemVo;
    }
    
    

    结尾

    本文到这里就结束了,感谢看到最后的朋友,都看到最后了点个赞再走啦,如有不对之处还请多多指正。

    相关文章

      网友评论

        本文标题:Java分析——使用异步编排任务和线程池

        本文链接:https://www.haomeiwen.com/subject/xhxpyktx.html