美文网首页
09-flink-Accumulator(累加器)

09-flink-Accumulator(累加器)

作者: 蜗牛写java | 来源:发表于2019-11-04 19:27 被阅读0次

09-flink-Accumulator(累加器)

概念

Accumulator(累加器):累加器主要作用在用户操作(operate)中收集分布式统计信息或聚合信息。每个并行实例创建并更新其自己的累加器对象,不同并行实例的累加器由系统作业结束后合并。结果可以从作业执行的结果中获得,也可以从web运行时监视器中获得。

分类

  1. IntCounter
  2. LongCounter
  3. DoubleCounter
  4. Histogram
  5. 自定义(实现SimpleAccumulator接口)

用法

  1. 创建累加器:private IntCounter numLines = new IntCounter();

    备注:在operate中使用

  2. 注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);

    备注:operate 实现 Rich...接口

  3. 使用累加器:this.numLines.add(1);

  4. 获取结果:

    JobExecutionResult myJobExecutionResult = env.execute("accumulators Test");
    myJobExecutionResult.getAccumulatorResult("num-lines")
    

例子

public class CounterTest {

    public static void main(String[] args) throws Exception {
        //获取执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //--hostname 10.24.14.193  --port 9000
        final ParameterTool params = ParameterTool.fromArgs(args);
        String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
        int port = params.has("port") ? params.getInt("port") : 9000;

        System.out.println("hostName=" + hostname + " port=" + port);

        //数据来源
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        //operate
        text.map(new RichMapFunction<String, String>() {

            //第一步:定义累加器
            private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                //第二步:注册累加器
                getRuntimeContext().addAccumulator("num-lines", this.numLines);
            }

            @Override
            public String map(String s) throws Exception {
                //第三步:累加
                this.numLines.add(1);
                return s;
            }
        });

        //数据去向
        text.print();

        //执行
        JobExecutionResult socketTest = env.execute("socketTest");

        //第四步:结束后输出总量;如果不需要结束后持久化,可以省去,因为在flinkUI中可以看到
        //String total = socketTest.getAccumulatorResult("num-lines").toString();
    }

sparkUI 查看

flinkUI-Accumulators.png

相关文章

  • 09-flink-Accumulator(累加器)

    09-flink-Accumulator(累加器) 概念 Accumulator(累加器):累加器主要作用在用户操...

  • MongoDB聚合管道——累加器(转)

    累加器(Accumulators) 累加器本来只能使用与project。当在project中使用时,累加器则是针对...

  • Spark累加器

    1. spark 累加器的原理: 2. 系统累加器 2. 自定义累加器(实现单词统计的累加)

  • Spark累加器(Accumulator)

    什么是累加器 累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)累加器用来把Exec...

  • spark之广播变量&累加器

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。累加器...

  • Spark快速入门(9) 高级话题:累加器变量

    本节我们会介绍一种在tasks之间共享可读写变量的方式,就是累加器变量。 累加器变量 累加器变量是在tasks之间...

  • Spark 之累加器

    1. Overview 本文将通过闭包的概念引出累加器,并介绍累加器的基本使用以及如何自定义累加器,文章最后将说明...

  • Spark—累加器

    spark累加器 累加器是一种共享变量,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是...

  • Python-其他库/模块

    1.collections库-累加器

  • SparkCore之RDD编程进阶

    累加器 累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter(...

网友评论

      本文标题:09-flink-Accumulator(累加器)

      本文链接:https://www.haomeiwen.com/subject/mozdbctx.html