美文网首页
Java并发编程-关于CompletableFuture的实验

Java并发编程-关于CompletableFuture的实验

作者: 苏wisdom | 来源:发表于2021-02-09 20:04 被阅读0次

    1 背景

    最近的项目中,有这样的场景:
    根据商品id列表,分别调用不同服务查询这批商品的价格、库存、卖点等信息,然后将这批商品信息组合返回。

    都是根据商品id查询的不同服务,如果拿着id挨个服务去串行执行(先查询价格服务->再查询库存服务->再查询卖点服务...),由于这些服务都是远程调用,很明显性能很低,而且后面的服务查询不依赖前面的查询结果,明显没有串行执行的必要。

    2 场景模拟

    2.1 服务准备

    简化的商品VO:

    @Data
    public class ItemVO {
        private Long itemId;
        private Long price;
        private Long quantity;
        private List<String> sellings;
    }
    

    模拟价格查询器:

    public class PriceQueryService {
        public Map<Long, Long> queryPrice(List<Long> itemIds) {
            Map<Long, Long> result = new HashMap<>(itemIds.size());
            ThreadLocalRandom random = ThreadLocalRandom.current();
            itemIds.forEach(id -> {
                Long price = random.nextLong(30000);
                result.put(id, price);
            });
          // 模拟延迟
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        }
    }
    

    模拟库存查询器:

    public class QuantityQueryService {
        public Map<Long, Long> queryQuantity(List<Long> itemIds) {
            Map<Long, Long> result = new HashMap<>(itemIds.size());
            ThreadLocalRandom random = ThreadLocalRandom.current();
            itemIds.forEach(id -> {
                Long price = random.nextLong(100);
                result.put(id, price);
            });
          // 模拟延迟
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        }
    }
    

    模拟卖点信息查询器:

    public class SellingsQueryService {
        public Map<Long, List<String>> querySellingText(List<Long> itemIds) {
            Map<Long, List<String>> result = new HashMap<>(itemIds.size());
            itemIds.forEach(id -> {
                List<String> text = Arrays.asList("可拆装","刺绣");
                result.put(id, text);
            });
          // 模拟延迟
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        }
    }
    

    2.2 CompletableFuture调用各个服务

    串行执行,1264毫秒左右

    public class QueryItemInfoService {
        private static final Logger log = LoggerFactory.getLogger(QueryItemInfoService.class);
    
        public static void main(String[] args) {
            log.info("start");
            Long start = System.currentTimeMillis();
            List<Long> itemIds = Arrays.asList(31332L, 23123L, 43834L, 34442L, 23213L, 434112L, 45342L);
            PriceQueryService priceQueryService = new PriceQueryService();
            QuantityQueryService quantityQueryService = new QuantityQueryService();
            SellingsQueryService sellingsQueryService = new SellingsQueryService();
    
            Map<Long, Long> priceMap = priceQueryService.queryPrice(itemIds);
            Map<Long, Long> quantityMap = quantityQueryService.queryQuantity(itemIds);
            Map<Long, List<String>> sellingsMap = sellingsQueryService.querySellingText(itemIds);
    
            List<ItemVO> itemVOS = itemIds.stream().map(
                    itemId -> {
                        ItemVO itemVO = new ItemVO();
                        itemVO.setItemId(itemId);
                        itemVO.setPrice(priceMap.getOrDefault(itemId, 0L));
                        itemVO.setQuantity(quantityMap.getOrDefault(itemId, 0L));
                        itemVO.setSellings(sellingsMap.getOrDefault(itemId, Collections.emptyList()));
                        return itemVO;
                    }
            ).collect(Collectors.toList());
            
            Long end = System.currentTimeMillis();
            log.info("end, duration = {}", end - start);
        }
    }
    

    异步执行, 550毫秒左右

    public class QueryItemInfoService {
        private static final Logger log = LoggerFactory.getLogger(QueryItemInfoService.class);
    
        public static void main(String[] args) {
            log.info("start");
            Long start = System.currentTimeMillis();
            List<Long> itemIds = Arrays.asList(31332L, 23123L, 43834L, 34442L, 23213L, 434112L, 45342L);
            PriceQueryService priceQueryService = new PriceQueryService();
            QuantityQueryService quantityQueryService = new QuantityQueryService();
            SellingsQueryService sellingsQueryService = new SellingsQueryService();
    
            CompletableFuture<Map<Long, Long>> priceFuture = CompletableFuture.supplyAsync(() -> priceQueryService.queryPrice(itemIds));
            CompletableFuture<Map<Long, Long>> quantityFuture = CompletableFuture.supplyAsync(() -> quantityQueryService.queryQuantity(itemIds));
            CompletableFuture<Map<Long, List<String>>> sellingsFuture = CompletableFuture.supplyAsync(() -> sellingsQueryService.querySellingText(itemIds));
    
            Map<Long, Long> priceMap = priceFuture.join();
            Map<Long, Long> quantityMap = quantityFuture.join();
            Map<Long, List<String>> sellingsMap = sellingsFuture.join();
    
            List<ItemVO> itemVOS = itemIds.stream().map(
                    itemId -> {
                        ItemVO itemVO = new ItemVO();
                        itemVO.setItemId(itemId);
                        itemVO.setPrice(priceMap.getOrDefault(itemId, 0L));
                        itemVO.setQuantity(quantityMap.getOrDefault(itemId, 0L));
                        itemVO.setSellings(sellingsMap.getOrDefault(itemId, Collections.emptyList()));
                        return itemVO;
                    }
            ).collect(Collectors.toList());
    
    
            Long end = System.currentTimeMillis();
            log.info("end, duration = {}", end - start);
        }
    }
    

    3 深入分析

    CompletableFuture 是Java8引入的,实现了Future和CompletionStage两个接口。弥补了Future类无法手动完成,无法链式调用等问题。

    常用的两个方法是runAsync和supplyAsync,两个方法的区别是,runAsync的入参是实现了Runnable接口的方法,也就是说没有返回值,返回的是CompletableFuture<Void>;supplyAsync入参是实现了Supplier接口的方法,也就是说有返回值。

    4 assemble

    相关文章

      网友评论

          本文标题:Java并发编程-关于CompletableFuture的实验

          本文链接:https://www.haomeiwen.com/subject/pbecxltx.html