一、pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
二、代码
package com.soul.flink
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @author soulChun
* @create 2019-06-30-23:20
*/
object SocketWindowWordCount {
def main(args: Array[String]): Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("hadoop000", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
三、测试
~/soul/app/flink-1.6.4 » start-cluster.sh
查看web正常打开hadoop000:8081

运行
启动nc
nc -l 9000
运行程序
flink run -class com.soul.flink.SocketWindowWordCount /Users/mac/IdeaProjects/flink-train/target/flink-train-1.0.jar --port 9000
nc窗口输入值
» nc -l 9000 mac@hadoop000
spark spark spark
spark spark spark
hadoop hadoop hadoop
a a a
查看结果
~/soul/app/flink-1.6.4 » tail -f log/flink-*-taskexecutor-*.out mac@hadoop000
==> log/flink-mac-taskexecutor-0-hadoop000.out <==
WordWithCount(spark,3)
WordWithCount(,4)
WordWithCount(,4)
WordWithCount(spark,3)
WordWithCount(spark,3)
WordWithCount(,4)
WordWithCount(,4)
WordWithCount(spark,3)
WordWithCount(spark,3)
WordWithCount(,4)
==> log/flink-mac-taskexecutor-1-hadoop000.out <==
==> log/flink-mac-taskexecutor-0-hadoop000.out <==
WordWithCount(hadoop,3)
WordWithCount(hadoop,3)
WordWithCount(hadoop,3)
WordWithCount(hadoop,3)
WordWithCount(hadoop,3)
WordWithCount(a,3)
WordWithCount(a,3)
WordWithCount(a,3)
WordWithCount(a,3)
WordWithCount(a,3)
网友评论