java8-流

作者: lesline | 来源:发表于2018-10-14 23:17 被阅读11次

    2、java8: 流

    reactor

    流是JavaAPI的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。

    示例

    示例一:取热量小于400,并按卡路里排序

    1. Java7
        public static List<String> getLowCaloricDishesNamesInJava7(List<Dish> dishes){
            List<Dish> lowCaloricDishes = new ArrayList<>();
            for(Dish d: dishes){
                if(d.getCalories() < 400){
                    lowCaloricDishes.add(d);
                }
            }
            List<String> lowCaloricDishesName = new ArrayList<>();
            Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
                public int compare(Dish d1, Dish d2){
                    return Integer.compare(d1.getCalories(), d2.getCalories());
                }
            });
            for(Dish d: lowCaloricDishes){
                lowCaloricDishesName.add(d.getName());
            }
            return lowCaloricDishesName;
        }
    
    1. java8实现
        public static List<String> getLowCaloricDishesNamesInJava8(List<Dish> dishes){
            return dishes.stream()
                    .filter(d -> d.getCalories() < 400)
                    .sorted(comparing(Dish::getCalories))
                    .map(Dish::getName)
                    .collect(toList());
        }
    

    为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():

    流概念

    简短的定义就是“从支持数据处理操作的源生成的元素序列”。


    流的构成.png

    特点:
    只遍历一次:一个流只能遍历一次
    内部迭代:steams库使用内部迭代把迭代做了,不需要通过用户编写代码进行外部迭代

    操作

    中间操作和终端操作

    中间操作和终端操作.png

    操作分类

    筛选:filter/distinct
    截短流:limit
    跳过元素:skip
    映射:map、flatmap
    查找:anyMatch/noneMatch/findAny/findFirst
    归约:reduce/collect

    map/flatmap的区别:

            List<Integer> mapResult = Stream.of(1, 2)
                    .map(number -> number + 1).collect(toList());
    
            List<Integer> flatMapResult = Stream.of(Arrays.asList(1, 2), Arrays.asList(3, 4))
                    .flatMap(numbers -> numbers.stream()).collect(toList());
    

    map:
    操作流中的每一个元素,(入参:出参=1:1)
    flatmap:
    仍操作流中的每一个元素,但流中的元素是列表,flatmap的Function入参是列表,出参是是流,将流合并作为执行结果是)(入参:出参=1:n)
    一言以蔽之,flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

            Arrays.asList("Hello", "World").stream()
                    .map(line -> line.split(""))
                    .flatMap(line -> Arrays.stream(line))
                    .distinct()
                    .forEach(System.out::println);
    

    执行过程如下图:


    flatmap执行过程.png

    归约求最大值、最小值、和

            //求和
            List<Integer> numbers = Arrays.asList(3, 4, 5, 1, 2);
            int sum = numbers.stream().reduce(0, (a, b) -> a + b);
            int sum1 = numbers.stream().reduce(0,  (a, b) -> Integer.sum(a, b));
            int sum2 = numbers.stream().reduce(0, Integer::sum);
              int sum3 = numbers.stream().mapToInt(Integer::intValue).sum();
    
    
    
            //最大值
            int max = numbers.stream().reduce(0, Integer::max);
    
            //最小值
            Optional<Integer> min = numbers.stream().reduce(Integer::min);
    

    构建流

    由值、数组、集合生成流水
    由文件生成流
    由函数生成流:创建无限流

    用流收集数据

    收集器

    收集器用于将stream中的元素做汇总,传递给collect方法的参数是Collector接口的一个实现。

    collect(Collectors.toList()):按顺序给每个元素生成一个列表;
    collect(Collectors.groupingBy(Transaction::getCurrency)):生成一个map,它的键是货币,值是等于该货币的列表。
    按币种对交易进行分组:

        //建立累积交易分组的Map
            Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
            //迭代Transaction的List
            for (Transaction transaction : transactions) {
                //提取Transaction的货币
                Currency currency = transaction.getCurrency();
                List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
                //如果分组Map中没有这种货币的条目,就创建一个
                if (transactionsForCurrency == null) {
                    transactionsForCurrency = new ArrayList<>();
                    transactionsByCurrencies.put(currency, transactionsForCurrency);
                }
                //将当前遍历的Transaction加入同一货币的Transaction的List
                transactionsForCurrency.add(transaction);
            }
    
            Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));
    

    预定义的收集器
    从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。
    它们主要提供了三大功能:

    • 将流元素归约和汇总为一个值
    • 元素分组
    • 元素分区
      自定义收集器
      自已实现Collector接口实现收集器功能

    归约和汇总

              //总和
            long howManyDishes = menu.stream().collect(counting());
    
            //平均
            double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
            
              //最大值
            Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
            Optional<Dish> mostCaloricDish = menu.stream().collect(maxBy(dishCaloriesComparator));
    
            //汇总 最大、最小、平均、总和  IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}
            IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));
    
    

    连接字符串

            //连接字符串 1,2,3
            String str = Arrays.asList(1, 2, 3).stream().map(String::valueOf).collect(joining(","));
    
            int sum1 = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
            int sum2 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (i, j) -> i + j));
            int sum3 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));
            int sum4 = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();
            int sum5 = menu.stream().mapToInt(Dish::getCalories).sum();
    

    注意:stream接口的collect和reduce方法通常可以获取相同的结果,尽可能的为手头的问题探索不同的解决方案,但在通用的方案中,始终选择最专门化的一个。

    分组

    分组示例:

            //在鱼类型分类
            Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
            
            //把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。
            Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
                    groupingBy(dish -> {
                        if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                        else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                        else return CaloricLevel.FAT;
                    }));
    

    多级分组:

      Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishedByTypeAndCaloricLevel = menu.stream().collect(
                    groupingBy(Dish::getType,
                            groupingBy((Dish dish) -> {
                                if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                                else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                                else return CaloricLevel.FAT;
                            })
                    )
            );
    

    多级分组过程:


    多级分组.png

    按子组收集数据

            //每类菜有多少个
            Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));
    
            //每类菜的总卡路里数
            Map<Dish.Type, Integer> sumCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                    summingInt(Dish::getCalories)));
    
            //每类菜的最高热量的菜
            Map<Dish.Type, Optional<Dish>> mostCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                    maxBy(Comparator.comparingInt(Dish::getCalories))));
    
            //每类菜的最高热量的菜-去除Optional
            Map<Dish.Type, Dish> mostCaloricDishesByTypeWithoutOptional = menu.stream().collect(
                    groupingBy(Dish::getType,
                            collectingAndThen(
                                    maxBy(Comparator.comparingInt(Dish::getCalories)),
                                    Optional::get)));
    

    分区

    分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。

            Map<Boolean, List<Dish>> partitionByVegeterian = menu.stream().collect(partitioningBy(Dish::isVegetarian));
    

    结果:{false=[pork, beef, chicken, prawns, salmon], true=[French fries, rice, season fruit, pizza]}

    并行流

    可以通过对收集源调用parallelStream方法来把集合转换为并行流。
    并行流内部使用了默认的ForkJoinPool(分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
    // 使用这个属性可以修改默认的线程数
    System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”20”);
    这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

    public static long parallelSum(List<Long> list) {
        return list.stream().parallel().reduce(Long::sum).get();
    }
    

    使用并行流时注意事项:
    1、要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高
    2、尽量不要有共享变更的修改,如果有,要注意线程安全(加锁的话,会出现中午执行)

    相关文章

      网友评论

          本文标题:java8-流

          本文链接:https://www.haomeiwen.com/subject/xffxzftx.html