美文网首页
【7】安装Flink

【7】安装Flink

作者: 07b287742148 | 来源:发表于2018-10-30 00:01 被阅读12次

    Standalone Cluster

    环境

    1. JDK1.8
    2. 集群节点vm01,vm02,vm03
       主节点vm01,从节点vm02,vm03
    

    安装步骤

    1. 官网下载 flink 1.6.1
    2. 在主节点上解压flink,进入配置目录. flink/conf
    3. 配置主节点地址. flink.yaml
       jobmanager.rpc.address:vm01
    
    1. 配置从节点文件. slaves
        vm02
        vm03
    
    1. 将flink文件夹发送到各个从节点
    2. 启动集群,主节点进入flink的bin目录 flink/bin
        ./start-cluster.sh
        
        启动成功后可在各个节点敲jps查看flink相关进程
        主节点 StandaloneSessionClusterEntrypoint
        从节点 TaskManagerRunner
        
        也可在web页面访问:vm01:8081
    
    1. 运行流处理的wordcount示例代码,在主节点flink根目录下
    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname vm01 --port 9999
    
    显示以下信息表示启动成功
    Starting execution of program
    
    监听端口,输入信息
    $ nc -l 9999
    hello
    word
    
    
    查看结果,注意是到从节点,也就是具体执行任务的节点上看
    $ tail -f flink/log/flink-*-taskexecutor-*-.out
    hello 1
    word 1
    
    1. 关闭flink,flink/bin
    $ ./stop-cluster.sh
    

    本地IDEA编程-Scala版本

    1. pom依赖
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>1.6.1</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.6.1</version>
        <scope>compile</scope>
    </dependency>
    
    1. scala代码
    package com.pein.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    object WindowCount {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val text = env.socketTextStream("vm01", 9999)
    
        val counts = text.flatMap{_.toLowerCase.split("\\W+")filter{_.nonEmpty}}
          .map{(_, 1)}
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
    
        counts.print()
    
        env.execute("Window Stream WordCount")
    
      }
    }
    
    
    1. 主节点输入信息在idea上可看到对应的输出
    $ nc -l vm01 9999
    123
    123
    123
    1233
    1233
    1233
    1233
    
    #本地调试的时候在主节点nc的时候要加上hostname否则连接不上
    
    #对应输出信息
    2> (123,2)
    2> (123,1)
    4> (1233,1)
    4> (1233,3)
    

    相关文章

      网友评论

          本文标题:【7】安装Flink

          本文链接:https://www.haomeiwen.com/subject/dghwtqtx.html