读写文件的操作,简单
一,代码
package org.bbk.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object Demo {
def main(args:Array[String]):Unit = {
val inputPath = "D:\\tmp\\count.txt"
val outputPath = "D:\\tmp\\result.txt"
val configuration: Configuration = new Configuration()
configuration.setBoolean("recursive.file.enumeration", false)
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath).withParameters(configuration)
import org.apache.flink.api.scala._
val value: AggregateDataSet[(String, Int)] = text
.flatMap(x => x.split(" "))
.map(x => (x, 1))
.groupBy(0)
.sum(1)
value.writeAsText(outputPath).setParallelism(1)
env.execute("batch word count")
}
}
二,输入文件
001 1569895882000
001 1569895885000
001 1569895888000
001 1569895890000
001 1569895891000
001 1569895895000
001 1569895898000
001 1569895900000
001 1569895911000
001 1569895948000
001 1569895945000
001 1569895947000
001 1569895950000
001 1569895960000
001 1569895949000
三,输出文件
(1569895945000,1)
(1569895948000,1)
(1569895960000,1)
(1569895882000,1)
(1569895895000,1)
(1569895947000,1)
(1569895888000,1)
(1569895890000,1)
(1569895898000,1)
(1569895949000,1)
(1569895885000,1)
(1569895900000,1)
(1569895950000,1)
(001,15)
(1569895891000,1)
(1569895911000,1)
网友评论