本文是基于scala 2.11、flink 1.12.0。
Maven Dependencies
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.12.0</flink.version>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果是1.10之后需要添加-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
单词计数
DataSet WordCount
public class DataSetWordCount {
public static void main(String[] args) throws Exception {
//创建Flink运行的上下文环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//创建DataSet
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
"Flink Flink Flink",
"Storm Storm Storm"
);
//通过内置的转换函数进行计算
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter())
//按照第几个元素分组
.groupBy(0)
//对分组的操作
.sum(1);
counts.printToErr();
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
DataStream WordCount
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class WordWithCount {
private String word;
private long count;
}
public class WordCountFlatMapFunction implements FlatMapFunction<String, WordWithCount> {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
}
public class WordCountReduceFunction implements ReduceFunction<WordWithCount> {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.getWord(), a.getCount() + b.getCount());
}
}
public class DataStreamWordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听端口
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
//将接收的数据进行拆分、分组,窗口计算并且进行聚合输出
DataStream<WordWithCount> windowCounts = text.flatMap(new WordCountFlatMapFunction())
.keyBy(vale -> vale.getWord())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new WordCountReduceFunction());
windowCounts.print().setParallelism(1);
env.execute("Socket Windows WordCount");
}
}
FlinkTable & SQL WordCount
可以将DataSet、DataStream转换为SQL。
public class TableWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
String data = "Hello Flink Hello World";
String[] words = data.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for (String word : words) {
list.add(new WC(word, 1));
}
DataSet<WC> input = fbEnv.fromCollection(list);
//DataSet 转sql, 指定字段名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
//注册为一个表
fbTableEnv.createTemporaryView("WordCount", table);
Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
//将表转换DataSet
DataSet<WC> dataSet = fbTableEnv.toDataSet(table02, WC.class);
dataSet.printToErr();
}
public static class WC {
public String word;
public long frequency;
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return word + ", " + frequency;
}
}
}
网友评论