美文网首页
flink之dataset的文件数据源

flink之dataset的文件数据源

作者: 万州客 | 来源:发表于2022-05-10 08:10 被阅读0次

读写文件的操作,简单

一,代码

package org.bbk.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration


object Demo {
  def main(args:Array[String]):Unit = {
    val inputPath = "D:\\tmp\\count.txt"
    val outputPath = "D:\\tmp\\result.txt"
    val configuration: Configuration = new Configuration()
    configuration.setBoolean("recursive.file.enumeration", false)
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath).withParameters(configuration)
    import org.apache.flink.api.scala._
    val value: AggregateDataSet[(String, Int)] = text
      .flatMap(x => x.split(" "))
      .map(x => (x, 1))
      .groupBy(0)
      .sum(1)
    value.writeAsText(outputPath).setParallelism(1)
    env.execute("batch word count")

  }
}


二,输入文件

001 1569895882000
001 1569895885000
001 1569895888000
001 1569895890000
001 1569895891000
001 1569895895000
001 1569895898000
001 1569895900000
001 1569895911000
001 1569895948000
001 1569895945000
001 1569895947000
001 1569895950000
001 1569895960000
001 1569895949000

三,输出文件

(1569895945000,1)
(1569895948000,1)
(1569895960000,1)
(1569895882000,1)
(1569895895000,1)
(1569895947000,1)
(1569895888000,1)
(1569895890000,1)
(1569895898000,1)
(1569895949000,1)
(1569895885000,1)
(1569895900000,1)
(1569895950000,1)
(001,15)
(1569895891000,1)
(1569895911000,1)

相关文章

网友评论

      本文标题:flink之dataset的文件数据源

      本文链接:https://www.haomeiwen.com/subject/qxuqurtx.html