取自 https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/datastream_api.html
含pom文件,未搬运过来。本文核心收获: Flink编程=图计算。
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
//第一步:创建一个StreamExecutionEnvironment (批处理任务则是创建一个ExecutionEnvironment).
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
//下一步:创建一个 Wikipedia IRC 日志数据源,从其读取数据:
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
//本例用于展示某个时间周期(5秒)内,每个用户增加的内容字节数,或者删除的内容字节数,
//首先,我们确定使用流里面的用户名作为key,用 KeySelector 来指定:
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
//其次,我们指定事件窗口的时长为5秒(无重叠),
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();//Tuple的概念用过python的同学一定不会陌生。就是固定不能修改的数组。
acc.f1 += event.getByteDiff();//在这5秒的窗口里,一个用户可能编辑多次(每次可能是增加了一定的字数、或删除了一定的字数),给它累加到一起。
return acc;
}
});
//第三,我们让它往console打印出来:
result.print();
see.execute();//最后,我们启动Flink的job。
//补充知识: 数据源的创建、数据格式的转化、sink操作们,这些操作都仅仅是定义一个图,而execute() 才真正运行这个图。
//图(graph)是图论里的概念。tensorflow等deeplearning工具里也有这个概念。
//在flink中,图可以在集群上运行,也可以在本地单个flink实例上运行。
}
}
打印示例:
1> (02:1980:8144:1591:0:0:0:1,-7)
1> (OxonAlex,279)
1> (Jimj wpg,-238)
3> (David Eppstein,-38)
3> (3.81.88.252,-2)
4> (A.lanzetta,-2)
4> (LoveFromBJM,191)
4> (ClueBot III,198)
4> (Mclarge90,-8)
4> (Abductive,22)
4> (A00:23C4:7312:8A00:4D8F:7AAD:921E:2707,5)
2> (Tombury89,0)
3> (Dainamo,-59)
3> (Eerie Holiday,0)
4> (ZxxZxxZ,295)
含义:每5秒输出一次,每次是这5秒里编辑Wikipedia的每个用户增减的字节数。
网友评论