美文网首页
flink离线处理demo

flink离线处理demo

作者: 万州客 | 来源:发表于2022-05-07 16:57 被阅读0次

简单,就是从一个文件里取文件作统计

一,代码

package org.bbk.flink

import org.apache.flink.api.scala.{AggregateDataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode

object Demo {
  def main(args:Array[String]):Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val fileDataSet: DataSet[String] = environment.readTextFile("D:\\tmp\\adult.txt", "utf-8")
    val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet
      .flatMap(x => x.split(" "))
      .map(x => (x, 1))
      .groupBy(0)
      .sum(1)
    resultDataSet.writeAsText("D:\\tmp\\output", WriteMode.OVERWRITE)
    environment.execute()
  }
}

二,输出

这只是其中一个,一共有十个文件

(,120)
((fe28c0c5280e26eede4948ef1f5048f1),4)
(13:00:12,233,1)
(13:00:13,541,1)
(13:00:13,587,1)
(CountWord(df,2),1)
(CountWord(df,4),1)
(Out,3)
(Print,3)
(socket,1)
(state,1)
(switched,7)

三,原文件

taskmanager_1  | 2022-05-05 13:00:12,233 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
jobmanager_1   | 2022-05-05 13:00:12,241 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from DEPLOYING to INITIALIZING.
jobmanager_1   | 2022-05-05 13:00:12,243 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
taskmanager_1  | 2022-05-05 13:00:13,541 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1  | 2022-05-05 13:00:13,548 INFO  org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction [] - Connecting to server socket 192.168.1.111:9000
jobmanager_1   | 2022-05-05 13:00:13,548 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1  | 2022-05-05 13:00:13,561 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
taskmanager_1  | 2022-05-05 13:00:13,569 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
taskmanager_1  | 2022-05-05 13:00:13,583 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1)#0 (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
jobmanager_1   | 2022-05-05 13:00:13,587 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
taskmanager_1  | CountWord(ytyu,1)
taskmanager_1  | CountWord(dddd,1)
taskmanager_1  | CountWord(this,1)
taskmanager_1  | CountWord(2222,1)
taskmanager_1  | CountWord(this,2)
taskmanager_1  | CountWord(this,3)
taskmanager_1  | CountWord(2222,2)
taskmanager_1  | CountWord(df,1)
taskmanager_1  | CountWord(df,2)
taskmanager_1  | CountWord(df,3)
taskmanager_1  | CountWord(df,4)
taskmanager_1  | CountWord(df,5)

2022-05-05 22_17_18-MessageCenterUI.png 2022-05-05 22_17_03-MessageCenterUI.png

相关文章

  • flink离线处理demo

    简单,就是从一个文件里取文件作统计 一,代码 二,输出 这只是其中一个,一共有十个文件 三,原文件

  • Flink 离线处理案例

    Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过tra...

  • flink demo坏境搭建

    #flink demo开发坏境搭建flink 入门idea的demo,Flink 提供了比较方便的创建 Flink...

  • 04 kafka作为Flink的数据源完成词频统计

    本节将展示使用kafka作为Flink的数据来源,该例子也是一个Flink流处理的demo。 1、前提约束 已安装...

  • 2021-01-12

    计划 hive , flink sql, flink demo 实际 FLINK SQL栏目https://www...

  • Hadoop、Spark、Flink概要

    Hadoop,Spark、Flink是目前重要的三大分布式计算系统 · Hadoop用于离线复杂大数据处理·Spa...

  • Spark是否能替代Hive

    在实际生产环境中已经形成了离线以Hive为主,Spark为辅, 实时处理用Flink的大数据架构体系及Impala...

  • Flink Kafka Doris实战demo

    Flink Kafka Doris实战demo 环境: Flink 1.12 Doris 0.12 Kafka 1...

  • Flink demo

    下载 Flink https://flink.apache.org/downloads.html#apache-f...

  • Flink批处理(Batch离线批处理)案例开发

    案例需求:统计一个文件中的单词出现的总次数,并且把结果存储到文件中。Java代码实现如下。 源文件如图: 结果文件如图:

网友评论

      本文标题:flink离线处理demo

      本文链接:https://www.haomeiwen.com/subject/hbnoyrtx.html