开发环境准备
-
JDK1.8
➜ ~ java -version java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
-
Maven
➜ ~ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T02:41:47+08:00) Maven home: /opt/apache-maven-3.6.0 Java version: 1.8.0_201, vendor: Oracle Corporation, runtime: /opt/jdk1.8.0_201/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-106-generic", arch: "amd64", family: "unix"
-
IDE
推荐使用 ItelliJ IDEA作为 Flink 应用程序的开发 IDE
创建Maven项目
使用maven模版创建项目
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=com.zflylin.demo \
-DartifactId=flink-demo \
-Dversion=0.0.1 \
-Dpackage=com.zflylin.demo \
-DinteractiveMode=false
关于mvn archetype:generate的相关参数,含义如下:
项目相关参数:
参数 | 含义 |
---|---|
groupId | 当前应用程序隶属的Group的ID |
artifactId | 当前应用程序的ID |
package | 代码生成时使用的根包的名字,如果没有给出,默认使用archetypeGroupId |
原型有关参数:
参数 | 含义 |
---|---|
archetypeGroupId | 原型(archetype)的Group ID |
archetypeArtifactId | 原型(archetype)ID |
archetypeVersion | 原型(archetype)版本 |
archetypeRepository | 包含原型(archetype)的资源库 |
archetypeCatalog | archetype分类,这里按位置分类有: ‘local’ 本地,通常是本地仓库的archetype-catalog.xml文件 ‘remote’ 远程,是maven的中央仓库 file://…’ 直接指定本地文件位置archetype-catalog.xml http://…’ or ‘https://…’ 网络上的文件位置 archetype-catalog.xml ‘internal’ 默认值是remote,local |
archetypeVersion | 原型(archetype)版本 |
代码目录如下:
➜ work tree flink-demo
flink-demo
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── zflylin
│ └── demo
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
编写 Flink 程序
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义数据源,这里是一个本地的9999端口的sock数据源
// 对数据源做分组、开窗、聚合操作
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// 打印数据
dataStream.print();
// 运行
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
在9999端口上启动netcat,准备输入
➜ work nc -lk 9999
启动Flink流计算程序, 然后输入单词。
这些输入将作为示例程序的输入。如果要使得某个单词的计数结果大于1,请在5秒钟内重复输入相同的单词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大。
➜ work nc -lk 9999
aa bb cc aa
dd cc
输出结果
....
22:08:55,928 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
5> (aa,2)
8> (cc,2)
5> (bb,1)
8> (dd,1)
网友评论