09-flink-Accumulator(累加器)
概念
Accumulator(累加器):累加器主要作用在用户操作(operate)中收集分布式统计信息或聚合信息。每个并行实例创建并更新其自己的累加器对象,不同并行实例的累加器由系统作业结束后合并。结果可以从作业执行的结果中获得,也可以从web运行时监视器中获得。
分类
- IntCounter
- LongCounter
- DoubleCounter
- Histogram
- 自定义(实现SimpleAccumulator接口)
用法
-
创建累加器:
private IntCounter numLines = new IntCounter();
备注:在operate中使用
-
注册累加器:
getRuntimeContext().addAccumulator("num-lines", this.numLines);
备注:operate 实现 Rich...接口
-
使用累加器:
this.numLines.add(1);
-
获取结果:
JobExecutionResult myJobExecutionResult = env.execute("accumulators Test"); myJobExecutionResult.getAccumulatorResult("num-lines")
例子
public class CounterTest {
public static void main(String[] args) throws Exception {
//获取执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//--hostname 10.24.14.193 --port 9000
final ParameterTool params = ParameterTool.fromArgs(args);
String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
int port = params.has("port") ? params.getInt("port") : 9000;
System.out.println("hostName=" + hostname + " port=" + port);
//数据来源
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
//operate
text.map(new RichMapFunction<String, String>() {
//第一步:定义累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//第二步:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
}
@Override
public String map(String s) throws Exception {
//第三步:累加
this.numLines.add(1);
return s;
}
});
//数据去向
text.print();
//执行
JobExecutionResult socketTest = env.execute("socketTest");
//第四步:结束后输出总量;如果不需要结束后持久化,可以省去,因为在flinkUI中可以看到
//String total = socketTest.getAccumulatorResult("num-lines").toString();
}
sparkUI 查看
![](https://img.haomeiwen.com/i14179336/010f5de9fa8a3c64.png)
网友评论