美文网首页
Flink WC 案例

Flink WC 案例

作者: 喵星人ZC | 来源:发表于2019-07-05 23:08 被阅读0次

    一、pom.xml

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
    
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
    
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
        </dependency>
    

    二、代码

    package com.soul.flink
    
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
      * @author soulChun
      * @create 2019-06-30-23:20
      */
    object SocketWindowWordCount {
      def main(args: Array[String]): Unit = {
    
        // the port to connect to
        val port: Int = try {
          ParameterTool.fromArgs(args).getInt("port")
        } catch {
          case e: Exception => {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
            return
          }
        }
    
        // get the execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // get input data by connecting to the socket
        val text = env.socketTextStream("hadoop000", port, '\n')
    
        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
          .flatMap { w => w.split("\\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5), Time.seconds(1))
          .sum("count")
    
        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)
    
    
        env.execute("Socket Window WordCount")
    
    
    
      }
    
      // Data type for words with count
      case class WordWithCount(word: String, count: Long)
    
    }
    

    三、测试

    ~/soul/app/flink-1.6.4 » start-cluster.sh
    

    查看web正常打开hadoop000:8081


    image.png

    运行
    启动nc

     nc -l 9000 
    

    运行程序

     flink run -class com.soul.flink.SocketWindowWordCount /Users/mac/IdeaProjects/flink-train/target/flink-train-1.0.jar --port 9000
    

    nc窗口输入值

    » nc -l 9000                                                                                                   mac@hadoop000
    spark   spark   spark
    spark   spark   spark
    hadoop  hadoop  hadoop    
    a       a       a
    

    查看结果

    ~/soul/app/flink-1.6.4 » tail -f log/flink-*-taskexecutor-*.out                                                  mac@hadoop000
    
    ==> log/flink-mac-taskexecutor-0-hadoop000.out <==
    WordWithCount(spark,3)
    WordWithCount(,4)
    WordWithCount(,4)
    WordWithCount(spark,3)
    WordWithCount(spark,3)
    WordWithCount(,4)
    WordWithCount(,4)
    WordWithCount(spark,3)
    WordWithCount(spark,3)
    WordWithCount(,4)
    
    ==> log/flink-mac-taskexecutor-1-hadoop000.out <==
    
    ==> log/flink-mac-taskexecutor-0-hadoop000.out <==
    WordWithCount(hadoop,3)
    WordWithCount(hadoop,3)
    WordWithCount(hadoop,3)
    WordWithCount(hadoop,3)
    WordWithCount(hadoop,3)
    WordWithCount(a,3)
    WordWithCount(a,3)
    WordWithCount(a,3)
    WordWithCount(a,3)
    WordWithCount(a,3)
    

    相关文章

      网友评论

          本文标题:Flink WC 案例

          本文链接:https://www.haomeiwen.com/subject/qboyhctx.html