-
添加SBT依赖
"org.apache.spark" %% "spark-streaming" % "2.3.1"
-
编写代码
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object Wordcount { def main(args: Array[String]): Unit = { // Spark配置对象 val conf = new SparkConf().setMaster("local[4]").setAppName("Wordcount") // StreamingContext-流式处理上下文对象 // 采集周期:以指定的时间为周期采集实时数据 val streamingContext = new StreamingContext(conf, Seconds(3)) // 从指定的端口中采集数据 var socketDStream = streamingContext.socketTextStream("192.168.0.100", 9999) // 将采集的数据进行分解(扁平化) val flatMapDStream = socketDStream.flatMap(_.split(" ")) // 转换结构方便计算 val mapDStream = flatMapDStream.map((_, 1)) // 按照key进行聚合,计算单词出现频率 val reduceByKeyDStream = mapDStream.reduceByKey(_ + _) // 输出结果 reduceByKeyDStream.print() // 启动流式上下文并设置等待 streamingContext.start() streamingContext.awaitTermination() } }
网友评论