美文网首页
Flink(1.13) 部署 local-cluster模式

Flink(1.13) 部署 local-cluster模式

作者: 万事万物 | 来源:发表于2021-08-16 13:42 被阅读0次

开发模式

使用 idea 编写 java 程序的方式运行 Flink程序方式就是开发模式

安装

程序我们肯定不会在本地运行,所以需要配置服务器

  • 上传安装包
$ scp flink-1.13.1-bin-scala_2.12.tgz admin@hadoop102:/opt/software
admin@hadoop102's password:
flink-1.13.1-bin-scala_2.12.tgz               100%  291MB  64.8MB/s   00:04
  • 解压
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz -C /opt/module/
  • 切换目录
cd /opt/module/
  • 重命名
 mv flink-1.13.1/ flink

Flink 配置

  • 根目录
bin  程序脚本
conf  配置文件
examples  官方demo
lib  依赖jar包
LICENSE  证书
licenses  证书
log  日志
NOTICE  
opt  
plugins  插件
README.txt 简介
  • bin 常用脚本
flink
start-cluster.sh
stop-cluster.sh

yarn 模式 只会用到flink命令

  • conf 配置

核心配置

flink-conf.yaml         
masters  //配置集群的leader 
workers //配置集群leader的slave
zoo.cfg     // 内置zookeeper的配置,不用关

masters与workers 对 yarn 模式无效,只有 Standalone有效。

日志相关的,提供了两种方式log4jlogback,配置其中一种或者使用默认即可。

log4j-console.properties 
log4j-session.properties  
log4j-cli.properties  
log4j.properties       
logback-session.xml     
logback-console.xml       
logback.xml   

local-cluster模式

Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习

不用改任何配置,直接启动
[admin@hadoop102 flink]$ bin/start-cluster.sh

[admin@hadoop102 flink]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.

访问web页面

http://hadoop102:8081/

overview

主界面

Task Managers

  • 内存模型(Metrics)
    是1.10之后新加的。


    内存模型

    Total Flink Memory 表示堆内内存
    off-head Memory 表示堆外内存

  • 日志(Logs)


    日志信息
  • 线程信息(Thread Dump)


    线程信息

Job Managers

  • 当前生效的配置


    当前生效的配置
  • 6123 内部通信端口

master 与 workers 的内部通信端口

内部通信端口

这些配置都能在conf 目录中找到。

Sumbit new Job

用于上传jar包,进运行

sumbit

两种提交方式

  • 编写程序
    public static void main(String[] args) throws Exception {

        // 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 读取由外部传入的文件地址
        DataStreamSource<String> streamSource = env.readTextFile(args[0]);
        
        // 处理文件中的每行数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> operator = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割并将数据收集
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 计算每个单词的个数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = operator.keyBy(0).sum(1);

        // 打印
        sum.print("test");

        // 开启
        env.execute();
    }
  • maven 打包
[INFO] --- maven-shade-plugin:3.2.4:shade (default) @ flink ---
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing D:\project\idea\flink\target\flink-1.0-SNAPSHOT.jar with D:\project\idea\flink\target\flink-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.965 s
[INFO] Finished at: 2021-08-06T19:24:12+08:00
[INFO] ------------------------------------------------------------------------
  • 使用web 端提交
执行jar包
Entry class : 指定 main 方法所的类路径
Parallelism:配置并行度,默认值为1
Program Arguments:main 方法传参
Savepoint Path:保存点
Show Plan :查看计划
Submit:提交
执行流程
运行结果
  • 使用 命令的方式提交
  1. 将jar包上传到服务器上(位置随意)
$ scp flink-1.0-SNAPSHOT.jar admin@hadoop102:/home/admin/document/jar/
admin@hadoop102's password:
flink-1.0-SNAPSHOT.jar                        100%   12KB   3.7MB/s   00:00
  1. 运行
    语法:bin/flink run -c main的全路径【空格】jar的路径【空格】<传入main方法中的参数>
 flink]$ bin/flink run -c com.admin.flink.demo02.Demo001 /home/admin/document/jar/flink-1.0-SNAPSHOT.jar /home/admin/document/worldcount.txt
  1. 执行日志
Job has been submitted with JobID dcc27da257fb828443d45efced97755a
Program execution finished
Job with JobID dcc27da257fb828443d45efced97755a has finished.
Job Runtime: 80 ms
  1. 执行结果
test> (hello,1)
test> (java,1)
test> (python,1)
test> (count,1)
test> (java,2)
test> (hello,2)
test> (google,1)
test> (hello,3)
test> (count,2)
test> (sparke,1)
test> (python,2)
test> (count,3)
test> (java,3)
test> (shell,1)
test> (java,4)
test> (hello,1)
test> (java,1)
test> (python,1)
test> (count,1)
test> (java,2)
test> (hello,2)
test> (google,1)
test> (hello,3)
test> (count,2)
test> (sparke,1)
test> (python,2)
test> (count,3)
test> (java,3)
test> (shell,1)
test> (java,4)

疑问:对比上面的结果,为什么数据有这么多?这种模式有个缺点就是会保留历史结果。

也可以在web 端进行查看

  • 停止
[admin@hadoop102 flink]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 5694) on host hadoop102.
Stopping standalonesession daemon (pid: 5383) on host hadoop102.

相关文章

网友评论

      本文标题:Flink(1.13) 部署 local-cluster模式

      本文链接:https://www.haomeiwen.com/subject/fyrivltx.html