美文网首页
flink离线处理demo

flink离线处理demo

作者: 万州客 | 来源:发表于2022-05-07 16:57 被阅读0次

    简单,就是从一个文件里取文件作统计

    一,代码

    package org.bbk.flink
    
    import org.apache.flink.api.scala.{AggregateDataSet, ExecutionEnvironment}
    import org.apache.flink.core.fs.FileSystem.WriteMode
    
    object Demo {
      def main(args:Array[String]):Unit = {
        val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        val fileDataSet: DataSet[String] = environment.readTextFile("D:\\tmp\\adult.txt", "utf-8")
        val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet
          .flatMap(x => x.split(" "))
          .map(x => (x, 1))
          .groupBy(0)
          .sum(1)
        resultDataSet.writeAsText("D:\\tmp\\output", WriteMode.OVERWRITE)
        environment.execute()
      }
    }
    

    二,输出

    这只是其中一个,一共有十个文件

    (,120)
    ((fe28c0c5280e26eede4948ef1f5048f1),4)
    (13:00:12,233,1)
    (13:00:13,541,1)
    (13:00:13,587,1)
    (CountWord(df,2),1)
    (CountWord(df,4),1)
    (Out,3)
    (Print,3)
    (socket,1)
    (state,1)
    (switched,7)
    
    

    三,原文件

    taskmanager_1  | 2022-05-05 13:00:12,233 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
    jobmanager_1   | 2022-05-05 13:00:12,241 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from DEPLOYING to INITIALIZING.
    jobmanager_1   | 2022-05-05 13:00:12,243 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
    taskmanager_1  | 2022-05-05 13:00:13,541 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
    taskmanager_1  | 2022-05-05 13:00:13,548 INFO  org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction [] - Connecting to server socket 192.168.1.111:9000
    jobmanager_1   | 2022-05-05 13:00:13,548 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
    taskmanager_1  | 2022-05-05 13:00:13,561 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
    taskmanager_1  | 2022-05-05 13:00:13,569 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
    taskmanager_1  | 2022-05-05 13:00:13,583 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1)#0 (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
    jobmanager_1   | 2022-05-05 13:00:13,587 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
    taskmanager_1  | CountWord(ytyu,1)
    taskmanager_1  | CountWord(dddd,1)
    taskmanager_1  | CountWord(this,1)
    taskmanager_1  | CountWord(2222,1)
    taskmanager_1  | CountWord(this,2)
    taskmanager_1  | CountWord(this,3)
    taskmanager_1  | CountWord(2222,2)
    taskmanager_1  | CountWord(df,1)
    taskmanager_1  | CountWord(df,2)
    taskmanager_1  | CountWord(df,3)
    taskmanager_1  | CountWord(df,4)
    taskmanager_1  | CountWord(df,5)
    
    
    2022-05-05 22_17_18-MessageCenterUI.png 2022-05-05 22_17_03-MessageCenterUI.png

    相关文章

      网友评论

          本文标题:flink离线处理demo

          本文链接:https://www.haomeiwen.com/subject/hbnoyrtx.html