程序介绍
flink的基本编程模型就是从一个source数据源读入数据,以stream的方式加工处理,并输出sink到某个地方,我们这里依照官方推荐的Word计数作为我们的第一个flink程序,其逻辑是从网络端口读取字符串数据,识别并计数里面的单词,输出到console终端;
项目准备
在IDEA中新建一个MAVEN项目,
image.png
导入依赖的JAR
在pom. xml 中 导入flink所需的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cc</groupId>
<artifactId>flink-test</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</project>
代码实现
public class WordCount {
public static void main(String[] args) throws Exception {
// 监听端口
int port = 9000;
// 端口也可以获取外部参数
//ParameterTool parameterTool = ParameterTool.fromArgs(args);
//port = parameterTool.getInt("port");
// 获取flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 连接本机socket的特定端口中获取输入的数据 设置分割符为 \n
DataStreamSource<String> streamSource = env.socketTextStream("localhost", port, "\n");
// 使用flatmap算子将读取到的数据解析 打平 转化为我们想要的数据结构<word,count=1>
SingleOutputStreamOperator<WordWithCount> streamFlat = streamSource.flatMap(new ParseInputMessgaeOperator());
SingleOutputStreamOperator<WordWithCount> streamResult = streamFlat
// 针对相同的word数据进行分组
//.keyBy("word")
.keyBy((KeySelector<WordWithCount, String>)WordWithCount::getWord)
// 指定一段时间生成一个聚合窗口 5s
.timeWindow(Time.seconds(5L))
// 将生成的窗口内的数据聚合起来(累加count)
//.sum("count");
.reduce((ReduceFunction<WordWithCount>)(wordWithCount, t1) -> {
// 断定分组后的数据key是一样的
Preconditions.checkState(wordWithCount.getWord().equals(t1.getWord()),"error");
// 累加count
wordWithCount.setCount(wordWithCount.getCount() + t1.getCount());
return wordWithCount;
});
// 将数据输出
streamResult.print();
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
}
package model;
/**
* Created By qiuzhi. Description: Date: 2020-06-30 Time: 5:28 PM
*
* @author zhanghaichao
* @date 2020/06/30
* 数据结构
*/
public class WordWithCount {
private String word;
private int count;
public WordWithCount(){}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
package operator;
import model.WordWithCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
/**
* Created By qiuzhi. Description: Date: 2020-06-30 Time: 5:26 PM
*
* @author zhanghaichao
* @date 2020/06/30
*/
public class ParseInputMessgaeOperator implements FlatMapFunction<String, WordWithCount> {
@Override
public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
// 参数校验
if (s == null || s.isEmpty()) {
return;
}
// 用空格分割输入
for (String word : s.split("\\s")) {
// 为每个输入的word生成一条记录
WordWithCount wordWithCount = new WordWithCount(word,1);
// 将记录下发到下游节点继续处理
collector.collect(wordWithCount);
}
}
}
运行结果
-
使用命令启动监听本机9000端口
nc -l 9000
-
运行代码,有3种方式部署和运行我们的程序:
- 直接在IDE中运行main方法,会通过StreamExecutionEnvironment将任务提交到JobManager执行,在concole即可看到执行结果(为了便于调试, 之后一般都会采用这种方式运行);
- 使用maven将其打包为一个jar,之后运行jar的2中方式在之前的文章中有介绍
启航 - flink入门案列-任务提交和运行
-
向本机9000端口输入一些word
image.png -
窗口结束后得到输出结果
image.png
网友评论