在学习技术时,总会有一个简单程序 Demo 带着我们入门,所以参考着官网例子,带大家快速熟悉 Flink 的 Hello World~
说明一下,项目运行的环境如下:
OS : Mac
Flink Version : 1.9
IDE : IDEA
Java Version : 1.8
下面来讲下关于环境准备,如果是 Windows 的用户,请参照每个步骤,找到适应自己的安装 or 启动方法。
环境准备
首先我们默认已经安装了 Jdk 1.8 和编码工具 IDEA,下面来讲如何安装 Flink 和建立脚手架。下面展示的项目代码已经放入了 Github[1],可以下载进行本地运行
安装 Flink
$brew install apache-flink
检查安装是否成功以及版本号
$ flink--versionVersion: 1.9.0,CommitID:9c32ed9
接着以单机集群模式启动 Flink
$sh/usr/local/Cellar/apache-flink/1.9.0/libexec/bin/start-cluster.shStartingcluster.Starting standalonesession daemononhost yejingqideMBP-c510.Starting taskexecutor daemononhost yejingqideMBP-c510.
然后访问 localhost:8081 监控界面(1.9 版本更新了 UI):
创建项目
这里推荐的是使用 maven 进行构建,在命令行中输入如下内容(# 号后面是说明,请不要输入):
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \# flink 的 group.id-DarchetypeArtifactId=flink-quickstart-java \# flink 的 artifact.id-DarchetypeVersion=1.9.0\# flink 的 version,以上三个请不要修改,按照默认即可-DgroupId=wiki-edits \# 项目的 group.id-DartifactId=wiki-edits \# 项目的 artifact.id-Dversion=0.1\# 项目的 version.id-Dpackage=wikiedits \# 项目的基础包名-DinteractiveMode=false# 是否需要和用户交互以获得输入,由于上面已经自己写了项目的参数,所以禁用了。反之请删掉 上面项目的配置,将交互模式设为 true
如果按照官方的例子填写,那么你将得到如下的项目结构:
$ tree wiki-editswiki-edits/├── pom.xml└── src └── main ├── java │ └── wikiedits │ ├── BatchJob.java│ └── StreamingJob.java└── resources └── log4j.properties
如果是自己自定义的,包结构会不一致,但是通过脚手架创立的,pom 文件中预置的依赖都将一致,引入了 Flink 基础开发相关的 API,然后通过 IDEA 打开该项目目录,就可以开始我们的 Hello world。
开始项目
首先交代一下待会的流程,编写程序代码,启动 netcat 命令来监听 9000 端口,启动或提交 Flink 程序,最后监听日志输出信息。
1、项目代码
Demo 的代码作用是监听 netcat 输入的字符,然后进行聚合操作,最后输出字符统计
publicclass SocketTextStreamWordCount {publicstaticvoidmain(String[] args)throwsException {StringhostName ="127.0.0.1";intport =9000;// 设置运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource stream = env.socketTextStream(hostName, port);// 计数SingleOutputStreamOperator> sum = stream .flatMap((newLineSplitter())) .keyBy(0) .sum(1);// 输出sum.print();// 提交任务env.execute("Java Word from SocketTextStream Example"); }publicstaticfinalclass LineSplitter implements FlatMapFunction> { @OverridepublicvoidflatMap(Strings, Collector> collector)throwsException {String[] tokens = s.toLowerCase().split("\\W+");for(Stringtoken : tokens) {if(token.length() >0) { collector.collect(newTuple2(token,1)); } } } }}
简单说明一下,上面出现了 SocketTextStream 套接字字符 数据源(Source),接着是 算子(Operator):FlatMap(一个输入源,可以输出零个或多个结果)、KeyBy(按照某字段或者 tuple 元组中某个下标进行分类) 和 sum(跟翻译一样,就是进行聚合汇总) ,最后输出
2、开启 tcp 长链接
为了模拟流数据,我们造的场景是不断往 9000 端口输入字符,Flink 程序添加的数据源是 SocketTextStream (套接字字符流)。
在你的终端中输入以下命令
$ nc -l 9000
有关 netcat 命令的用法,请看参考资料第二条,这里的作用就是打开 TCP 长链接,监听 9000 端口
3、启动 Flink 程序
刚才第一个步骤中,已经编辑好了程序代码,第二个步骤也已经启动了一个 TCP 客户端,启动 Flink 程序有两种方法:
3.1、本地调试
使用 IDEA 的好处很多,代码补全,语法检查和快捷键之类的。我经常使用的调试方法就是添加一个 psvm 的 main 方法,在里面写执行代码,最后点击绿色的启动按钮~
如果不需要调试,想直接看结果,选择第一个 Run,但有时不确定代码执行过程和出错的具体原因,可以通过第二个选项 Debug 进行调试。
这是本地开发经常使用的方法,进行结果的验证。
3.2、提交到 JobManager
前面我们启动的是单机集群版,启动了一个 JobManager 和 TaskWorker,打开的 localhost:8081 就是 JobManager 的监控面板,所以我们要通过下面的方式,将 Flink程序提交到 JobManager。
这里教一个简单的方法,我们通过 mvn clean package 进行打包后,可以在 IDEA 集成的终端标签栏下提交我们的程序:
由于每个人的绝对路径都不一样,所以我们通过 IDEA 的终端,它会自动定位到项目的路径,然后执行时填写相对路径的 jar 包名字即可
$ flink run -c cn.sevenyuan.wordcount.SocketTextStreamWordCounttarget/flink-quick-start-1.0-SNAPSHOT.jar
-c 参数是指定运行的主程序入口,接着我们去查看监控面板,可以发现任务状态已经处于监控中:
顶部信息讲的是运行程序名字、时间、时间线、配置参数等信息,底下 Name 一栏,说明该程序逻辑步骤(读取数据源,进行映射处理,使用 keyBy 和聚合运算,最后输出到【打印 sink】)
4、输入数据 & 验证结果
前面验证了程序正常启动,接下来我们来验证输入和输出
先来监听输出,进入 Flink 的日志目录,接着通过 tail 命令监听任务执行者 TaskWorkder(默认会启动一个任务执行者,所以编码为 0) 的日志输出
$usr/local/Cellar/apache-flink/1.9.0/libexec/log$ tail -400f flink*-taskexecutor-0*.out
接着,在 nc -l 9000 对应的终端窗口中输入如下数据:
$ nc -l 9000hello worldtest worldtest hellohello my world
最后就能够看到以下输出结果:
(hello,1)(world,1)(test,1)(world,2)(test,2)(hello,2)(hello,3)(my,1)(world,3)
每行字符以空格进行分割,然后分别进行汇总统计,得到的输出结果一致。
扩展阅读
如果你在官网阅览,应该也曾看到过 TimeWindow 时间窗口的例子,下面是 Demo 代码
publicclass SocketWindowWordCount {publicstaticvoidmain(String[] args)throwsException {// the port to connect toStringhostName ="127.0.0.1";intport =9000;// get the execution environmentfinalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStreamtext= env.socketTextStream("localhost", port,"\n");// parse the data, group it, window it, and aggregate the countsDataStream windowCounts =text.flatMap(newFlatMapFunction() { @OverridepublicvoidflatMap(Stringvalue, Collector out) {for(Stringword : value.split("\\s")) { out.collect(newWordWithCount(word,1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(newReduceFunction() { @OverridepublicWordWithCount reduce(WordWithCount a, WordWithCount b) {returnnewWordWithCount(a.getWord(), a.getCount() + b.getCount()); } });// print the results with a single thread, rather than in parallelwindowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); }}
这里的程序代码核心点在于,比之前的多了一个算子 timeWindow,并且有两个参数,分别是时间窗口大小以及滑动窗口大小(Time size, Time slide),下面是简单的输入和输出示意图:
由于滑动窗口大小是 1s,窗口是有重合的部分,然后每秒统计自己所在窗口的数据(5s 内传输过来的数据),可以看到第 6s 时,已经舍弃掉第 0s 输入的字符串数据。
小伙伴们也可以修改一下时间窗口大小和滑动窗口大小,然后输入自定义的数据,进行不同参数的设置,看下输出效果如何,是否有达到自己的预期。
这里先初步接触一下 时间(Time)和窗口(Window)概念,之后慢慢接触逐步加深理解吧。
总结
本文基于 Mac 系统、 Apache Flink 1.9 版本进行了项目搭建和 Demo 编写,介绍了 Suorce -> Transformation -> Sink 的流程。简单的实现了一个字符计数器,往套接字数据源 SocketTextStream,源源不断的输入,然后进行统计出现的次数,如有疑惑或不对之处请与我讨论~
项目地址
https://github.com/Vip-Augus/flink-learning-note[2]
git clone https://github.com/Vip-Augus/flink-learning-note
原文:https://mp.weixin.qq.com/s/lz4igq815ZUK14rcZcHc1g
作者:惊奇
来源:微信公众号
网友评论