开发模式
使用 idea 编写 java 程序的方式运行 Flink程序方式就是
开发模式
。
安装
程序我们肯定不会在本地运行,所以需要配置服务器
- 上传安装包
$ scp flink-1.13.1-bin-scala_2.12.tgz admin@hadoop102:/opt/software
admin@hadoop102's password:
flink-1.13.1-bin-scala_2.12.tgz 100% 291MB 64.8MB/s 00:04
- 解压
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz -C /opt/module/
- 切换目录
cd /opt/module/
- 重命名
mv flink-1.13.1/ flink
Flink 配置
- 根目录
bin 程序脚本
conf 配置文件
examples 官方demo
lib 依赖jar包
LICENSE 证书
licenses 证书
log 日志
NOTICE
opt
plugins 插件
README.txt 简介
- bin 常用脚本
flink
start-cluster.sh
stop-cluster.sh
yarn 模式 只会用到
flink
命令
- conf 配置
核心配置
flink-conf.yaml
masters //配置集群的leader
workers //配置集群leader的slave
zoo.cfg // 内置zookeeper的配置,不用关
masters与workers 对 yarn 模式无效,只有
Standalone
有效。
日志相关的,提供了两种方式log4j
与logback
,配置其中一种或者使用默认即可。
log4j-console.properties
log4j-session.properties
log4j-cli.properties
log4j.properties
logback-session.xml
logback-console.xml
logback.xml
local-cluster模式
Flink中的Local-cluster(本地集群)模式,主要用于
测试
,学习
。
不用改任何配置,直接启动
[admin@hadoop102 flink]$ bin/start-cluster.sh
[admin@hadoop102 flink]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
访问web页面
overview
![](https://img.haomeiwen.com/i17367901/d4b5825882bfcdb7.png)
Task Managers
-
内存模型(Metrics)
是1.10之后新加的。
内存模型
Total Flink Memory 表示堆内内存
off-head Memory 表示堆外内存 -
日志(Logs)
日志信息
-
线程信息(Thread Dump)
线程信息
Job Managers
-
当前生效的配置
当前生效的配置
-
6123 内部通信端口
master 与 workers 的内部通信端口
![](https://img.haomeiwen.com/i17367901/bd5ce9c853c30fe2.png)
这些配置都能在conf
目录中找到。
Sumbit new Job
用于上传jar包,进运行
![](https://img.haomeiwen.com/i17367901/e88acabff92aa76a.png)
两种提交方式
- 编写程序
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取由外部传入的文件地址
DataStreamSource<String> streamSource = env.readTextFile(args[0]);
// 处理文件中的每行数据
SingleOutputStreamOperator<Tuple2<String, Integer>> operator = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切割并将数据收集
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
// 计算每个单词的个数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = operator.keyBy(0).sum(1);
// 打印
sum.print("test");
// 开启
env.execute();
}
- maven 打包
[INFO] --- maven-shade-plugin:3.2.4:shade (default) @ flink ---
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing D:\project\idea\flink\target\flink-1.0-SNAPSHOT.jar with D:\project\idea\flink\target\flink-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.965 s
[INFO] Finished at: 2021-08-06T19:24:12+08:00
[INFO] ------------------------------------------------------------------------
- 使用web 端提交
执行jar包
Entry class : 指定main
方法所的类路径
Parallelism:配置并行度,默认值为1
Program Arguments:main 方法传参
Savepoint Path:保存点
Show Plan :查看计划
Submit:提交
![](https://img.haomeiwen.com/i17367901/277b2af1f4a245e6.png)
![](https://img.haomeiwen.com/i17367901/0e2cd2a487d8117e.png)
- 使用 命令的方式提交
- 将jar包上传到服务器上(位置随意)
$ scp flink-1.0-SNAPSHOT.jar admin@hadoop102:/home/admin/document/jar/
admin@hadoop102's password:
flink-1.0-SNAPSHOT.jar 100% 12KB 3.7MB/s 00:00
- 运行
语法:bin/flink run -cmain的全路径
【空格】jar的路径【空格】<传入main方法中的参数>
flink]$ bin/flink run -c com.admin.flink.demo02.Demo001 /home/admin/document/jar/flink-1.0-SNAPSHOT.jar /home/admin/document/worldcount.txt
- 执行日志
Job has been submitted with JobID dcc27da257fb828443d45efced97755a
Program execution finished
Job with JobID dcc27da257fb828443d45efced97755a has finished.
Job Runtime: 80 ms
- 执行结果
test> (hello,1)
test> (java,1)
test> (python,1)
test> (count,1)
test> (java,2)
test> (hello,2)
test> (google,1)
test> (hello,3)
test> (count,2)
test> (sparke,1)
test> (python,2)
test> (count,3)
test> (java,3)
test> (shell,1)
test> (java,4)
test> (hello,1)
test> (java,1)
test> (python,1)
test> (count,1)
test> (java,2)
test> (hello,2)
test> (google,1)
test> (hello,3)
test> (count,2)
test> (sparke,1)
test> (python,2)
test> (count,3)
test> (java,3)
test> (shell,1)
test> (java,4)
疑问:对比上面的结果,为什么数据有这么多?这种模式有个缺点就是会保留历史结果。
也可以在web 端进行查看
- 停止
[admin@hadoop102 flink]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 5694) on host hadoop102.
Stopping standalonesession daemon (pid: 5383) on host hadoop102.
网友评论