美文网首页
010 Spark Stream 案例2:单词统计例子

010 Spark Stream 案例2:单词统计例子

作者: 逸章 | 来源:发表于2019-12-21 22:01 被阅读0次

1 环境搭建

Scala版本: scala-2.11.12
Spark版本: spark-2.4.4-bin-hadoop2.7


image.png

2 代码

整体结构

image.png

SBT信息

image.png

两个类的代码信息

image.png
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}   
    /**
    * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
    *
    * Usage: NetworkWordCount <hostname> <port>
    * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
    *
    * To run this on your local machine, you need to first run a Netcat server
    * `$ nc -lk 9999`
    * and then run the example
    $SPARK_HOME/bin/spark-submit  --master local[2] --class NetworkWordCount 
    --name NetworkWordCount target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999*/
    object NetworkWordCount {
        def main(args: Array[String]) {
            if (args.length < 2) {
                System.err.println("Usage: NetworkWordCount <hostname> <port>")
                System.exit(1)
            }           
            StreamingExamples.setStreamingLogLevels()           
            // Create the context with a 1 second batch size
            val sparkConf = new SparkConf().setAppName("NetworkWordCount")
            val ssc = new StreamingContext(sparkConf, Seconds(1))
            
            // Create a socket stream on target ip:port and count the
            // words in input stream of \n delimited text (eg. generated by 'nc')
            // Note that no duplication in storage level only for running locally.
            // Replication necessary in distributed scenario for fault tolerance.
            val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
            val words = lines.flatMap(_.split(" "))
            val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
image.png
import org.apache.log4j.{Level, Logger}

import org.apache.spark.internal.Logging

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels(): Unit = {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

3 打包并执行

运行nc

image.png
打包
image.png
执行
$SPARK_HOME/bin/spark-submit  --master local[2] --class NetworkWordCount --name NetworkWordCount target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
image.png
image.png

相关文章

网友评论

      本文标题:010 Spark Stream 案例2:单词统计例子

      本文链接:https://www.haomeiwen.com/subject/xllnnctx.html