简单,就是从一个文件里取文件作统计
一,代码
package org.bbk.flink
import org.apache.flink.api.scala.{AggregateDataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object Demo {
def main(args:Array[String]):Unit = {
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val fileDataSet: DataSet[String] = environment.readTextFile("D:\\tmp\\adult.txt", "utf-8")
val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet
.flatMap(x => x.split(" "))
.map(x => (x, 1))
.groupBy(0)
.sum(1)
resultDataSet.writeAsText("D:\\tmp\\output", WriteMode.OVERWRITE)
environment.execute()
}
}
二,输出
这只是其中一个,一共有十个文件
(,120)
((fe28c0c5280e26eede4948ef1f5048f1),4)
(13:00:12,233,1)
(13:00:13,541,1)
(13:00:13,587,1)
(CountWord(df,2),1)
(CountWord(df,4),1)
(Out,3)
(Print,3)
(socket,1)
(state,1)
(switched,7)
三,原文件
taskmanager_1 | 2022-05-05 13:00:12,233 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
jobmanager_1 | 2022-05-05 13:00:12,241 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from DEPLOYING to INITIALIZING.
jobmanager_1 | 2022-05-05 13:00:12,243 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from DEPLOYING to INITIALIZING.
taskmanager_1 | 2022-05-05 13:00:13,541 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Socket Stream -> Flat Map -> Map (1/1)#0 (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1 | 2022-05-05 13:00:13,548 INFO org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction [] - Connecting to server socket 192.168.1.111:9000
jobmanager_1 | 2022-05-05 13:00:13,548 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Socket Stream -> Flat Map -> Map (1/1) (fe28c0c5280e26eede4948ef1f5048f1) switched from INITIALIZING to RUNNING.
taskmanager_1 | 2022-05-05 13:00:13,561 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
taskmanager_1 | 2022-05-05 13:00:13,569 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
taskmanager_1 | 2022-05-05 13:00:13,583 INFO org.apache.flink.runtime.taskmanager.Task [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1)#0 (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
jobmanager_1 | 2022-05-05 13:00:13,587 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Keyed Aggregation -> Sink: Print to Std. Out (1/1) (e2dc213af028a1599dff957fa128a489) switched from INITIALIZING to RUNNING.
taskmanager_1 | CountWord(ytyu,1)
taskmanager_1 | CountWord(dddd,1)
taskmanager_1 | CountWord(this,1)
taskmanager_1 | CountWord(2222,1)
taskmanager_1 | CountWord(this,2)
taskmanager_1 | CountWord(this,3)
taskmanager_1 | CountWord(2222,2)
taskmanager_1 | CountWord(df,1)
taskmanager_1 | CountWord(df,2)
taskmanager_1 | CountWord(df,3)
taskmanager_1 | CountWord(df,4)
taskmanager_1 | CountWord(df,5)
2022-05-05 22_17_18-MessageCenterUI.png
2022-05-05 22_17_03-MessageCenterUI.png
网友评论