美文网首页JAVA相关
Stream 学习笔记(中)

Stream 学习笔记(中)

作者: freeseawind | 来源:发表于2018-08-13 13:04 被阅读171次
    开发环境
    • eclipse 4.7.3a
    • jdk 10
    前置知识点
    关于 Reduction

    Stream包含许多最终操作,比如: average、sum、min、max、count,它们通过通过组合流的内容返回一个值。这些操作我们暂时称之为 “统计操作“(Reduction)。

    下面我们来分别介绍两种不同的 “统计操作“ 方式:

    • Stream.reduce
    • Stream.collect

    关于Reduction的命名,有更好的提议可以留言告知。

    Stream.reduce 方法

    如下所示,该管道计算集合中男性成员年龄的总和,它使用 Stream.sum 方法来进行 “统计操作“

    Integer totalAge = roster.stream().mapToInt(Person::getAge).sum();
    

    使用Stream.reduce操作来计算相同的值

    roster.stream().map(Person::getAge).reduce(0, (a, b) -> a + b).intValue();
    
    Stream.reduce方法包含两个参数:
    • identity:“统计操作“ 的结果,如果流中没有元素,则identity元素既是初始值又是默认结果。
    • accumulator: 累加器函数有两个参数:统计的部分结果(在本例中,到目前为止所有处理过的整数的总和)和流的下一个元素(在本例中为整数), 它返回一个新的中间值。 在此示例中,accumulator函数是一个lambda表达式,它添加两个Integer值并返回一个Integer值。

    如果reduce操作涉及向集合添加元素,那么每次accumulator函数处理元素时,它都会创建一个包含元素的新集合,这可能会影响性能。

    Stream.collect 方法

    与reduce方法不同,reduce方法在处理元素时始终创建新值,而collect方法修改或改变现有值。
    考虑如何计算平均值,需要两类数据:总记录数和记录的总和。collect方法只创建一次新值,用来跟踪总记录数和记录的总和,例如以下类Averager:

    static class Averager implements IntConsumer
    {
        private int total = 0;
        private int count = 0;
    
         public double average()
         {
              return count > 0 ? ((double) total) / count : 0;
          }
    
          public void accept(int i)
          {
              total += i;
              count++;
          }
    
          public void combine(Averager other)
          {
              total += other.total;
              count += other.count;
          }
     }
    

    如下所示,该管道使用collect方法计算集合中男性成员年龄的总和

    // collect
    Averager averageCollect = roster.stream().filter(p -> p.getGender() == Person.Sex.MALE).map(Person::getAge).
    collect(Averager::new, Averager::accept, Averager::combine);
    
     System.out.println("Average age of male members: " + averageCollect.average());
    
    Stream.collect 方法包含三个参数:
    • supplier: 工厂对象,返回结果容器中,用来存储计算的中间结果。在此示例中,返回的是Averager的实例。
    • accumulator:累加器,函数将流元素合并到结果容器中。 在此示例中,它通过将count变量递增1并将total元素的值进行累加,并保存到Averager结果容器中。
    • combiner: 组合器函数接受多个结果容器并合并其内容,如果存在的话。
    Stream.collect 方法的其它用例
    过滤结果集
    List<Integer> list = roster.stream()
    .filter(p -> p.getGender() == Person.Sex.MALE). map(Person::getAge).collect(Collectors.toList());
    
    按关键字分组
    Map<Person.Sex, List<Person>> byGender =
        roster
            .stream()
            .collect(
                Collectors.groupingBy(Person::getGender));
    
    分组汇总
    Map<Person.Sex, Double> averageAgeByGender = roster
        .stream()
        .collect(
            Collectors.groupingBy(
                Person::getGender,                      
                Collectors.averagingInt(Person::getAge)));
    
    源码学习

    查看下面代码的完整调用示例

    Map<Person.Sex, Double> averageAgeByGender0 = roster.stream()
                    .collect(Collectors.groupingBy(Person::getGender, Collectors.averagingInt(Person::getAge)));
    
    Collector 部分源码
    public interface Collector<T, A, R> {
    
        Supplier<A> supplier();
    
        BiConsumer<A, T> accumulator();
    
        BinaryOperator<A> combiner();
    
        Function<A, R> finisher();
    }
    
    通过Collector接口声明,我们可以看到几个关键的方法申明:
    • supplier:结果容器工厂方法
    • accumulator:累加器工厂方法,返回累加器实例
    • combiner:组合函数工厂方法,返回组合函数接口实例
    • finisher:终止操作工厂方法,返回终止操作函数接口实例
    泛型说明
    • T:参与累加器计算的对象类型
    • A:结果容器对象类型
    • R : 终止操作返回的结果类型
    Collectors.averagingInt 源码
    public static <T> Collector<T, ?, Double>
        averagingInt(ToIntFunction<? super T> mapper) {
            return new CollectorImpl<>(
                    () -> new long[2],
                    (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; },
                    (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
                    a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
    }
    
    通过Collector接口可知,Collectors.averagingInt方法是如何实现求平均值的
    1. 创建long数组保存中间计算结果
    2. 调用accumulator方法进行累计,并把中间结果存储到long数组中
    3. 如果需要合并多个中间结果,则把两个元素的结果进行汇总保存到第一个流元素中
    4. 计算并得到平均值
    Collectors.groupingBy源码
    public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<M> mapFactory,
                                      Collector<? super T, A, D> downstream) {
            Supplier<A> downstreamSupplier = downstream.supplier();
            BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
            BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                downstreamAccumulator.accept(container, t);
            };
            BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
            @SuppressWarnings("unchecked")
            Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    
            if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
                return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
            }
            else {
                @SuppressWarnings("unchecked")
                Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
                Function<Map<K, A>, M> finisher = intermediate -> {
                    intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                    @SuppressWarnings("unchecked")
                    M castResult = (M) intermediate;
                    return castResult;
                };
                return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
            }
    }
    
    源码剖析

    了解相应泛型对应本例的类型

    • T:Person对象
    • K:Person.Sex
    • D:Double
    • A:long[]
    • M:Map<Person.Sex, Double>
    探讨部分

    探讨点一

    # 入参部分
    Supplier<M> mapFactory
    
    # 后续又进行了强制类型转换
    Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    

    从源码可以看出 mapFactory工厂方法应返回Map<Person.Sex, Long[]> 对象,所以上述方法参数是否为如下会更加明了

    public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<K, A> mapFactory,
                                      Collector<? super T, A, D> downstream)
    

    探讨点二

    Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
    Function<Map<K, A>, M> finisher = intermediate -> {
        intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
        @SuppressWarnings("unchecked")
        M castResult = (M) intermediate;
        return castResult;
    };
    

    源码中终止方法又进行了两次强制类型转换

    1. 为了调用intermediate.replaceAll,对downstream方法进行了一次强制类型转换
    2. 为了返回类型约束的结果对M进行了一次强制类型转换

    重构如下

    # 修改方法的声明
    public static <T, K, D, A> Collector<T, ?, Map<K, D>> groupingBy(....)
    
    # 修改结果返回函数
    @SuppressWarnings("unchecked")
    Function<Map<K, A>, Map<K, D>> finisher = intermediate ->
    {
        Map<K, D> castResult = new HashMap<>();
    
        intermediate.entrySet().stream()
            .forEach(t -> castResult.put(t.getKey(), downstream.finisher().apply(t.getValue())));
             return castResult;
    };
    

    以上仅为个人观点,水平有限,欢迎大家指正。

    关于泛型,它的初衷是让具有强制类型转换的代码具有更好的安全性和可读性。

    Github工程地址

    相关文章

      网友评论

        本文标题:Stream 学习笔记(中)

        本文链接:https://www.haomeiwen.com/subject/eamybftx.html