美文网首页
Spark Streaming (1) ——WordCount

Spark Streaming (1) ——WordCount

作者: bclz | 来源:发表于2019-07-16 00:50 被阅读0次

    参考: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html

    1. 窗口操作(Window Operations)

    Duration 时间窗口 (Batch Duration , slide duration, window duration)

    * Batch Duration

    批处理间隔,它是指Spark Streaming以多少时间间隔为单位来提交任务逻辑。比如1min,30s.这一参数将会伴随整个StreamingContext的生命周期且无法重新设置。

    Spark Streaming处理数据的单位是一批,Spark Streaming系统需要设置间隔使得数据汇总到一定的量后再一并进行批处理,这个间隔就是batch duration. 此参数决定提交作业的频率和数据处理的延迟
    1.batch duration(最基本时间间隔)
    2.slide duration(处理数据时间间隔)
    3.window duration(处理数据量的时间间隔)

    eg:batch duration=10s
        设置滑动窗口是 2*(batch duration)
        窗口间隔是 3*(batch duration)
        
    一个任务是reduceByKey ->print的计算;
    20s后提交一次任务,会把20s内的数据提交计算(window 大小为2,第一次可能并未撑满,随着时间的推进,窗口会被最终撑满)
            ->
    之后每过20s(滑动窗口)提交任务一次任务,会把最后30s(window duration)的数据提交计算
    

    官方实例:


    streaming-dstream-window.png

    2. 离散数据集(Discretized Streams <DStreams>)

    SparkStreaming抽象了离散数据流(Dstream,Discretized Stream)这个概念,它包含了一组连续的RDD,这一组连续的RDD代表了连续的流式数据;
    DStream是一组时间序列上连续的RDD来表示的,每个RDD都包含特定时间间隔内的数据流。我们对DStream上的各种操作最终都会映射到内部的RDD中。

    Spark输入源对照表
    输入源 说明
    actorStream 基于Akka actor 的输入源,需要用户定义actor接收器
    fileStream 创建输入源用来监控文件系统的变化,若有新文件添加,则将它读入并输入数据流
    queueStream 基于一个RDD队列创建一个输入源
    socketStream 通过TCP套接字接口创建输入源
    kafkaStream 通过读取Kafka分布式消息队列作为数据的输入源
    flumeStream 以Flume数据源作为输入源
    twitterStream 以Twitter数据源作为输入源
    mqttStream 以MQTT数据源作为输入源
    zeromqStream 以ZeroMQ数据源作为输入源

    1)Transformations on DStreams

    与RDD类似

    官方说明: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#transformations-on-dstreams

    2) Output Operations on DStreams

    官方文档: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#output-operations-on-dstreams

    package sparkstreaming
    
    import java.io.PrintWriter
    import java.net.ServerSocket
    import java.util.concurrent.Executors
    import java.util.concurrent.atomic.AtomicInteger
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingWordCount {
    
    
      def main(args: Array[String]): Unit = {
        val host = "localhost"
        val port = 1024
        val conf = new SparkConf()
          .setAppName("Streaming Word Count App")
          .setMaster("local[*]")
    
        //指定StreamingContext的conf和batch duration=5s
        //默认slide duration和window duration和 batch duration相同
        //即每五秒提交一次任务,每次计算数据量是最后5秒的数据
        //也可指定slide duration和window duration,建议最好是batch的整数倍
        val sc = new StreamingContext(conf, Seconds(5))
        sc.sparkContext.setLogLevel("ERROR")
        val socketStreaming = sc.socketTextStream(host, port)
        val wordCountUnit = socketStreaming.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        //打印计算结果默认10行
        wordCountUnit.print()
        //计算任务流程创建结束
    
        sc.start() //启动执行计划
        sc.awaitTermination()
        sc.stop()
      }
    
    }
    
    /**测试socket server 发送消息*/
    object socketMessage{
    
    
      def main(args: Array[String]): Unit = {
          val counter=new AtomicInteger(0)
    
          val serverSocket=new ServerSocket(1024)
          val executorPool=Executors.newCachedThreadPool()
          println("准备连接...")
          while(true){
    
            val socket=serverSocket.accept()
            print("start write data...")
            executorPool.execute(new Runnable {
              override def run(): Unit = {
                while (true){
                  Thread.sleep(1000)
                  val writer = new PrintWriter(socket.getOutputStream)
                  writer.println("Word hello"+counter.getAndIncrement())
                  writer.flush()
                }
    
              }
            })
            Thread.sleep(2000)
          }
          Thread.sleep(100)
          executorPool.shutdown()
      }
    
    }
    
    
    

    结果:

    -------------------------------------------
    Time: 1563209290000 ms
    -------------------------------------------
    (hello2,1)
    (hello3,1)
    (Word,5)
    (hello4,1)
    (hello5,1)
    (hello6,1)
    
    -------------------------------------------
    Time: 1563209295000 ms
    -------------------------------------------
    (hello10,1)
    (Word,5)
    (hello11,1)
    (hello7,1)
    (hello8,1)
    (hello9,1)
    ...
    

    相关文章

      网友评论

          本文标题:Spark Streaming (1) ——WordCount

          本文链接:https://www.haomeiwen.com/subject/hxsakctx.html