1、流计算概述
关系数据库并不是为存储快速、连续到达的流数据而设计的,不支持连续处理
流计算的处理流程一般包含:数据实时采集、数据实时计算、数据实时查询

2、Spark Streaming
2.1 Spark Streaming设计
将实时输入数据流以时间片为单位进行拆分,然后采用Spark引擎以类似批处理的方式处理每个时间片数据

Spark Streaming最主要的抽象是离散化数据流(Discretized Stream, DStream)标识连续不断的数据流。Spark Streaming的输入数据按照时间片分成一段一段,每一段数据转换成RDD,DSteam的操作最终都被转变为相应的RDD操作

2.2 与Storm区别
Spark Streaming 无法实现毫秒级计算,因为分解为批处理后,产生多个Spark作用和,还要经过DAG规划、任务管理器,有一定开销
RDD数据级的容错更加高效
3、DStream操作概述
3.1 Spark Streaming工作机制
在Spark Streaming中,有一个Receiver,作为一个长期运行Task运行在Executor上,每个Receiver负责一个DStream输入流。Receiver收到数据后提交给Spark Streaming处理并对结果进行分发。

3.2 编写程序基本步骤
- 创建DStream来定义输入源
- 通过对DStream应用转换操作和输出操作来定义流计算
- 调用StreamingContext对象的start()方法来开始接受数据和处理流程
- 调用StreamingContext对象的awaitTermination()方法来等到计算进程结束或调用stop()
3.3 创建StreamingContext对象
- 表示按1秒进行切分
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
4、基本输入源
4.1 文件源
- textFileStream()
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(20))
scala> val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
scala> val words = lines.flatMap(_.split(" "))
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
scala> wordCounts.print()
scala> ssc.start()
scala> ssc.awaitTermination()
4.2 RDD队列流
- 可以调用StreamingContext对象的queueStream()方法来创建基于RDD队列的DStream
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 10){
rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
Thread.sleep(1000)
}
ssc.stop()
}
}
5、高级数据源
- Broker kafka服务器
- Topic 每条发布到Kafka集群的消息都有一个类别,这个类别是Topic,物理上不同Topic消息分开存储
- Partition 每个Topic包含一个或多个
- Producer 负责发布消息到Broker
- Consumer 想Broker读取消息
- Consumer Group 每个Consumer属于一个特定的Group
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
object KafkaWordCount{
def main(args:Array[String]){
StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1" //Topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group"
val topics = "wordsender" //topics的名称
val numThreads = 1 //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) // 这里用到了ssc
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
6、转换操作
6.1 无状态转换
不记录历史状态就是无状态的
- map(func)
- flatMap(func)
- filter(func)
- repartition(numPartitions)
- reduce(func) 返回一个包含单元素RDD的新DStream
- count()
- union(otherStream)
- countByValue()
- reduceByKey(func, [numTasks])
- join(otherStream,[numTasks])
- cogroup(otherStream,[numTasks]) (K,V) (K,W)会变成(K, Seq(V), Seq(W))
- transform(func)

6.2 有状态转换
滑动窗口


- reduceByKeyAndWindo(+, -_-, Minutes(2), Seconds(10), 2)

updateStateByKey
updateStateByKey会对DStream中的数据根据key计算,然后对各个批次进行累加
- updateStateByKey(updateFunc)
- updateFunc (Seq[V], Options[S]) => Option[S] 第一个参数就是当前key对应的所有value,第二参数标识当前key的历史状态,计算后得到最新的状态
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
def main(args: Array[String]) {
//定义状态更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
StreamingExamples.setStreamingLogLevels() //设置log4j日志级别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
val sc = new StreamingContext(conf, Seconds(5))
sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/") //设置检查点,检查点具有容错机制
val lines = sc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
sc.start()
sc.awaitTermination()
}
}
网友评论