美文网首页
Flink(1.13) 部署 local-cluster模式

Flink(1.13) 部署 local-cluster模式

作者: 万事万物 | 来源:发表于2021-08-16 13:42 被阅读0次

    开发模式

    使用 idea 编写 java 程序的方式运行 Flink程序方式就是开发模式

    安装

    程序我们肯定不会在本地运行,所以需要配置服务器

    • 上传安装包
    $ scp flink-1.13.1-bin-scala_2.12.tgz admin@hadoop102:/opt/software
    admin@hadoop102's password:
    flink-1.13.1-bin-scala_2.12.tgz               100%  291MB  64.8MB/s   00:04
    
    • 解压
    tar -zxvf flink-1.13.1-bin-scala_2.12.tgz -C /opt/module/
    
    • 切换目录
    cd /opt/module/
    
    • 重命名
     mv flink-1.13.1/ flink
    

    Flink 配置

    • 根目录
    bin  程序脚本
    conf  配置文件
    examples  官方demo
    lib  依赖jar包
    LICENSE  证书
    licenses  证书
    log  日志
    NOTICE  
    opt  
    plugins  插件
    README.txt 简介
    
    • bin 常用脚本
    flink
    start-cluster.sh
    stop-cluster.sh
    

    yarn 模式 只会用到flink命令

    • conf 配置

    核心配置

    flink-conf.yaml         
    masters  //配置集群的leader 
    workers //配置集群leader的slave
    zoo.cfg     // 内置zookeeper的配置,不用关
    

    masters与workers 对 yarn 模式无效,只有 Standalone有效。

    日志相关的,提供了两种方式log4jlogback,配置其中一种或者使用默认即可。

    log4j-console.properties 
    log4j-session.properties  
    log4j-cli.properties  
    log4j.properties       
    logback-session.xml     
    logback-console.xml       
    logback.xml   
    

    local-cluster模式

    Flink中的Local-cluster(本地集群)模式,主要用于测试, 学习

    不用改任何配置,直接启动
    [admin@hadoop102 flink]$ bin/start-cluster.sh

    [admin@hadoop102 flink]$ bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host hadoop102.
    Starting taskexecutor daemon on host hadoop102.
    

    访问web页面

    http://hadoop102:8081/

    overview

    主界面

    Task Managers

    • 内存模型(Metrics)
      是1.10之后新加的。


      内存模型

      Total Flink Memory 表示堆内内存
      off-head Memory 表示堆外内存

    • 日志(Logs)


      日志信息
    • 线程信息(Thread Dump)


      线程信息

    Job Managers

    • 当前生效的配置


      当前生效的配置
    • 6123 内部通信端口

    master 与 workers 的内部通信端口

    内部通信端口

    这些配置都能在conf 目录中找到。

    Sumbit new Job

    用于上传jar包,进运行

    sumbit

    两种提交方式

    • 编写程序
        public static void main(String[] args) throws Exception {
    
            // 创建环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 读取由外部传入的文件地址
            DataStreamSource<String> streamSource = env.readTextFile(args[0]);
            
            // 处理文件中的每行数据
            SingleOutputStreamOperator<Tuple2<String, Integer>> operator = streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切割并将数据收集
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            // 计算每个单词的个数
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = operator.keyBy(0).sum(1);
    
            // 打印
            sum.print("test");
    
            // 开启
            env.execute();
        }
    
    • maven 打包
    [INFO] --- maven-shade-plugin:3.2.4:shade (default) @ flink ---
    [INFO] Replacing original artifact with shaded artifact.
    [INFO] Replacing D:\project\idea\flink\target\flink-1.0-SNAPSHOT.jar with D:\project\idea\flink\target\flink-1.0-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 3.965 s
    [INFO] Finished at: 2021-08-06T19:24:12+08:00
    [INFO] ------------------------------------------------------------------------
    
    • 使用web 端提交
    执行jar包
    Entry class : 指定 main 方法所的类路径
    Parallelism:配置并行度,默认值为1
    Program Arguments:main 方法传参
    Savepoint Path:保存点
    Show Plan :查看计划
    Submit:提交
    执行流程
    运行结果
    • 使用 命令的方式提交
    1. 将jar包上传到服务器上(位置随意)
    $ scp flink-1.0-SNAPSHOT.jar admin@hadoop102:/home/admin/document/jar/
    admin@hadoop102's password:
    flink-1.0-SNAPSHOT.jar                        100%   12KB   3.7MB/s   00:00
    
    1. 运行
      语法:bin/flink run -c main的全路径【空格】jar的路径【空格】<传入main方法中的参数>
     flink]$ bin/flink run -c com.admin.flink.demo02.Demo001 /home/admin/document/jar/flink-1.0-SNAPSHOT.jar /home/admin/document/worldcount.txt
    
    1. 执行日志
    Job has been submitted with JobID dcc27da257fb828443d45efced97755a
    Program execution finished
    Job with JobID dcc27da257fb828443d45efced97755a has finished.
    Job Runtime: 80 ms
    
    1. 执行结果
    test> (hello,1)
    test> (java,1)
    test> (python,1)
    test> (count,1)
    test> (java,2)
    test> (hello,2)
    test> (google,1)
    test> (hello,3)
    test> (count,2)
    test> (sparke,1)
    test> (python,2)
    test> (count,3)
    test> (java,3)
    test> (shell,1)
    test> (java,4)
    test> (hello,1)
    test> (java,1)
    test> (python,1)
    test> (count,1)
    test> (java,2)
    test> (hello,2)
    test> (google,1)
    test> (hello,3)
    test> (count,2)
    test> (sparke,1)
    test> (python,2)
    test> (count,3)
    test> (java,3)
    test> (shell,1)
    test> (java,4)
    

    疑问:对比上面的结果,为什么数据有这么多?这种模式有个缺点就是会保留历史结果。

    也可以在web 端进行查看

    • 停止
    [admin@hadoop102 flink]$ bin/stop-cluster.sh
    Stopping taskexecutor daemon (pid: 5694) on host hadoop102.
    Stopping standalonesession daemon (pid: 5383) on host hadoop102.
    

    相关文章

      网友评论

          本文标题:Flink(1.13) 部署 local-cluster模式

          本文链接:https://www.haomeiwen.com/subject/fyrivltx.html