美文网首页
Spark编程基础(Scala版)——Spark Streami

Spark编程基础(Scala版)——Spark Streami

作者: kaiker | 来源:发表于2021-08-09 19:16 被阅读0次

1、流计算概述

关系数据库并不是为存储快速、连续到达的流数据而设计的,不支持连续处理

流计算的处理流程一般包含:数据实时采集、数据实时计算、数据实时查询

流计算数据处理流程

2、Spark Streaming

2.1 Spark Streaming设计

将实时输入数据流以时间片为单位进行拆分,然后采用Spark引擎以类似批处理的方式处理每个时间片数据

Spark Streaming

Spark Streaming最主要的抽象是离散化数据流(Discretized Stream, DStream)标识连续不断的数据流。Spark Streaming的输入数据按照时间片分成一段一段,每一段数据转换成RDD,DSteam的操作最终都被转变为相应的RDD操作

DStream操作

2.2 与Storm区别

Spark Streaming 无法实现毫秒级计算,因为分解为批处理后,产生多个Spark作用和,还要经过DAG规划、任务管理器,有一定开销

RDD数据级的容错更加高效

3、DStream操作概述

3.1 Spark Streaming工作机制

在Spark Streaming中,有一个Receiver,作为一个长期运行Task运行在Executor上,每个Receiver负责一个DStream输入流。Receiver收到数据后提交给Spark Streaming处理并对结果进行分发。

Spark Streaming

3.2 编写程序基本步骤

  • 创建DStream来定义输入源
  • 通过对DStream应用转换操作和输出操作来定义流计算
  • 调用StreamingContext对象的start()方法来开始接受数据和处理流程
  • 调用StreamingContext对象的awaitTermination()方法来等到计算进程结束或调用stop()

3.3 创建StreamingContext对象

  • 表示按1秒进行切分
    scala> import  org.apache.spark.streaming._
    scala> val  ssc = new StreamingContext(sc, Seconds(1))

4、基本输入源

4.1 文件源

  • textFileStream()
    scala> import  org.apache.spark.streaming._
    scala> val  ssc = new StreamingContext(sc, Seconds(20))
    scala> val  lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
    scala> val  words = lines.flatMap(_.split(" "))
    scala> val  wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    scala> wordCounts.print()
    scala> ssc.start()
    scala> ssc.awaitTermination()

4.2 RDD队列流

  • 可以调用StreamingContext对象的queueStream()方法来创建基于RDD队列的DStream
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
ssc.start()
for (i <- 1 to 10){
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
    }
    ssc.stop()
  }
}

5、高级数据源

  • Broker kafka服务器
  • Topic 每条发布到Kafka集群的消息都有一个类别,这个类别是Topic,物理上不同Topic消息分开存储
  • Partition 每个Topic包含一个或多个
  • Producer 负责发布消息到Broker
  • Consumer 想Broker读取消息
  • Consumer Group 每个Consumer属于一个特定的Group
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1"  //Topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group" 
val topics = "wordsender"  //topics的名称
val numThreads = 1  //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) // 这里用到了ssc
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}

6、转换操作

6.1 无状态转换

不记录历史状态就是无状态的

  • map(func)
  • flatMap(func)
  • filter(func)
  • repartition(numPartitions)
  • reduce(func) 返回一个包含单元素RDD的新DStream
  • count()
  • union(otherStream)
  • countByValue()
  • reduceByKey(func, [numTasks])
  • join(otherStream,[numTasks])
  • cogroup(otherStream,[numTasks]) (K,V) (K,W)会变成(K, Seq(V), Seq(W))
  • transform(func)
武装淘汰转换操作

6.2 有状态转换

滑动窗口

窗口操作
窗口操作
  • reduceByKeyAndWindo(+, -_-, Minutes(2), Seconds(10), 2)
reduceByKeyAndWindow

updateStateByKey

updateStateByKey会对DStream中的数据根据key计算,然后对各个批次进行累加

  • updateStateByKey(updateFunc)
  • updateFunc (Seq[V], Options[S]) => Option[S] 第一个参数就是当前key对应的所有value,第二参数标识当前key的历史状态,计算后得到最新的状态
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定义状态更新函数
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //设置检查点,检查点具有容错机制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}

相关文章

网友评论

      本文标题:Spark编程基础(Scala版)——Spark Streami

      本文链接:https://www.haomeiwen.com/subject/jlrovltx.html