美文网首页程序员
从Java 8 stream 到 rxjava, 记录一次数据流

从Java 8 stream 到 rxjava, 记录一次数据流

作者: 黄大海 | 来源:发表于2018-03-19 19:04 被阅读126次

    在java8之前, 一直都是用guava中的Iterables和FluentIterables来处理数据流。 java8 的 lambda 和 方法引用 极大的简化了内部类的处理。
    不过stream还是比较初级的,实际使用过程中只能处理简单的数据流任务。主要遇到的问题是,在调用消费方法(例如forEach, collect)之后。该流即为终结状态,无法再复用。 无法优雅地需要处理大量中间结果的复杂计算。

    于是试用了一下rxjava, 底层是订阅发布模型,上层可以用来处理数据流。
    实现引入框架,我们用到了extension中的数学方法

            <dependency>
                <groupId>io.reactivex.rxjava2</groupId>
                <artifactId>rxjava</artifactId>
                <version>2.1.10</version>
            </dependency>
            <dependency>
                <groupId>com.github.akarnokd</groupId>
                <artifactId>rxjava2-extensions</artifactId>
                <version>0.18.8</version>
            </dependency>
    

    下面这段代码对集合的某个Decimal字段求和

            BigDecimal sigma = flowables
                .map(CalculateContext::getTempValue1)
                .reduce(BigDecimal.ZERO, (sum, each) -> sum.add(each))
                .blockingGet();
    

    筛除记录、排序、幷缓存结果(下次使用不会再运行之前的回调)

    flowable = flowable.filter(each -> each.someCondition  == true)
                .sorted(Comparator.comparing(EachType::getTempValue).reversed())
                .cache();
    

    合并两个流, 映射到某个字段,然后取最大值。这里用到了扩展包

    Flowable<Type> flowable = Flowable
                .merge(flowable1, flowable2)
                .map(Type::getValue)
                .to(MathFlowable::max)
                .blockingSingle();
    

    项目中没有用到flatMap。 这个也是极常用的操作。还有一些高级特性包括背压、线程控制 暂时没有涉及,以后有这样的场景机会的时候再尝试下 :)

    相关文章

      网友评论

        本文标题:从Java 8 stream 到 rxjava, 记录一次数据流

        本文链接:https://www.haomeiwen.com/subject/fhgpqftx.html