美文网首页
Spark Streaming (1) ——WordCount

Spark Streaming (1) ——WordCount

作者: bclz | 来源:发表于2019-07-16 00:50 被阅读0次

参考: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html

1. 窗口操作(Window Operations)

Duration 时间窗口 (Batch Duration , slide duration, window duration)

* Batch Duration

批处理间隔,它是指Spark Streaming以多少时间间隔为单位来提交任务逻辑。比如1min,30s.这一参数将会伴随整个StreamingContext的生命周期且无法重新设置。

Spark Streaming处理数据的单位是一批,Spark Streaming系统需要设置间隔使得数据汇总到一定的量后再一并进行批处理,这个间隔就是batch duration. 此参数决定提交作业的频率和数据处理的延迟
1.batch duration(最基本时间间隔)
2.slide duration(处理数据时间间隔)
3.window duration(处理数据量的时间间隔)

eg:batch duration=10s
    设置滑动窗口是 2*(batch duration)
    窗口间隔是 3*(batch duration)
    
一个任务是reduceByKey ->print的计算;
20s后提交一次任务,会把20s内的数据提交计算(window 大小为2,第一次可能并未撑满,随着时间的推进,窗口会被最终撑满)
        ->
之后每过20s(滑动窗口)提交任务一次任务,会把最后30s(window duration)的数据提交计算

官方实例:


streaming-dstream-window.png

2. 离散数据集(Discretized Streams <DStreams>)

SparkStreaming抽象了离散数据流(Dstream,Discretized Stream)这个概念,它包含了一组连续的RDD,这一组连续的RDD代表了连续的流式数据;
DStream是一组时间序列上连续的RDD来表示的,每个RDD都包含特定时间间隔内的数据流。我们对DStream上的各种操作最终都会映射到内部的RDD中。

Spark输入源对照表
输入源 说明
actorStream 基于Akka actor 的输入源,需要用户定义actor接收器
fileStream 创建输入源用来监控文件系统的变化,若有新文件添加,则将它读入并输入数据流
queueStream 基于一个RDD队列创建一个输入源
socketStream 通过TCP套接字接口创建输入源
kafkaStream 通过读取Kafka分布式消息队列作为数据的输入源
flumeStream 以Flume数据源作为输入源
twitterStream 以Twitter数据源作为输入源
mqttStream 以MQTT数据源作为输入源
zeromqStream 以ZeroMQ数据源作为输入源

1)Transformations on DStreams

与RDD类似

官方说明: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#transformations-on-dstreams

2) Output Operations on DStreams

官方文档: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#output-operations-on-dstreams

package sparkstreaming

import java.io.PrintWriter
import java.net.ServerSocket
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWordCount {


  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val port = 1024
    val conf = new SparkConf()
      .setAppName("Streaming Word Count App")
      .setMaster("local[*]")

    //指定StreamingContext的conf和batch duration=5s
    //默认slide duration和window duration和 batch duration相同
    //即每五秒提交一次任务,每次计算数据量是最后5秒的数据
    //也可指定slide duration和window duration,建议最好是batch的整数倍
    val sc = new StreamingContext(conf, Seconds(5))
    sc.sparkContext.setLogLevel("ERROR")
    val socketStreaming = sc.socketTextStream(host, port)
    val wordCountUnit = socketStreaming.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //打印计算结果默认10行
    wordCountUnit.print()
    //计算任务流程创建结束

    sc.start() //启动执行计划
    sc.awaitTermination()
    sc.stop()
  }

}

/**测试socket server 发送消息*/
object socketMessage{


  def main(args: Array[String]): Unit = {
      val counter=new AtomicInteger(0)

      val serverSocket=new ServerSocket(1024)
      val executorPool=Executors.newCachedThreadPool()
      println("准备连接...")
      while(true){

        val socket=serverSocket.accept()
        print("start write data...")
        executorPool.execute(new Runnable {
          override def run(): Unit = {
            while (true){
              Thread.sleep(1000)
              val writer = new PrintWriter(socket.getOutputStream)
              writer.println("Word hello"+counter.getAndIncrement())
              writer.flush()
            }

          }
        })
        Thread.sleep(2000)
      }
      Thread.sleep(100)
      executorPool.shutdown()
  }

}


结果:

-------------------------------------------
Time: 1563209290000 ms
-------------------------------------------
(hello2,1)
(hello3,1)
(Word,5)
(hello4,1)
(hello5,1)
(hello6,1)

-------------------------------------------
Time: 1563209295000 ms
-------------------------------------------
(hello10,1)
(Word,5)
(hello11,1)
(hello7,1)
(hello8,1)
(hello9,1)
...

相关文章

网友评论

      本文标题:Spark Streaming (1) ——WordCount

      本文链接:https://www.haomeiwen.com/subject/hxsakctx.html