参考: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html
1. 窗口操作(Window Operations)
Duration 时间窗口 (Batch Duration , slide duration, window duration)
* Batch Duration
批处理间隔,它是指Spark Streaming以多少时间间隔为单位来提交任务逻辑。比如1min,30s.这一参数将会伴随整个StreamingContext的生命周期且无法重新设置。
Spark Streaming处理数据的单位是一批,Spark Streaming系统需要设置间隔使得数据汇总到一定的量后再一并进行批处理,这个间隔就是batch duration. 此参数决定提交作业的频率和数据处理的延迟
1.batch duration(最基本时间间隔)
2.slide duration(处理数据时间间隔)
3.window duration(处理数据量的时间间隔)
eg:batch duration=10s
设置滑动窗口是 2*(batch duration)
窗口间隔是 3*(batch duration)
一个任务是reduceByKey ->print的计算;
20s后提交一次任务,会把20s内的数据提交计算(window 大小为2,第一次可能并未撑满,随着时间的推进,窗口会被最终撑满)
->
之后每过20s(滑动窗口)提交任务一次任务,会把最后30s(window duration)的数据提交计算
官方实例:
streaming-dstream-window.png
2. 离散数据集(Discretized Streams <DStreams>)
SparkStreaming抽象了离散数据流(Dstream,Discretized Stream)这个概念,它包含了一组连续的RDD,这一组连续的RDD代表了连续的流式数据;
DStream是一组时间序列上连续的RDD来表示的,每个RDD都包含特定时间间隔内的数据流。我们对DStream上的各种操作最终都会映射到内部的RDD中。
Spark输入源对照表
输入源 | 说明 |
---|---|
actorStream | 基于Akka actor 的输入源,需要用户定义actor接收器 |
fileStream | 创建输入源用来监控文件系统的变化,若有新文件添加,则将它读入并输入数据流 |
queueStream | 基于一个RDD队列创建一个输入源 |
socketStream | 通过TCP套接字接口创建输入源 |
kafkaStream | 通过读取Kafka分布式消息队列作为数据的输入源 |
flumeStream | 以Flume数据源作为输入源 |
twitterStream | 以Twitter数据源作为输入源 |
mqttStream | 以MQTT数据源作为输入源 |
zeromqStream | 以ZeroMQ数据源作为输入源 |
1)Transformations on DStreams
与RDD类似
官方说明: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#transformations-on-dstreams
2) Output Operations on DStreams
官方文档: https://spark.apache.org/docs/2.1.3/streaming-programming-guide.html#output-operations-on-dstreams
package sparkstreaming
import java.io.PrintWriter
import java.net.ServerSocket
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = 1024
val conf = new SparkConf()
.setAppName("Streaming Word Count App")
.setMaster("local[*]")
//指定StreamingContext的conf和batch duration=5s
//默认slide duration和window duration和 batch duration相同
//即每五秒提交一次任务,每次计算数据量是最后5秒的数据
//也可指定slide duration和window duration,建议最好是batch的整数倍
val sc = new StreamingContext(conf, Seconds(5))
sc.sparkContext.setLogLevel("ERROR")
val socketStreaming = sc.socketTextStream(host, port)
val wordCountUnit = socketStreaming.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//打印计算结果默认10行
wordCountUnit.print()
//计算任务流程创建结束
sc.start() //启动执行计划
sc.awaitTermination()
sc.stop()
}
}
/**测试socket server 发送消息*/
object socketMessage{
def main(args: Array[String]): Unit = {
val counter=new AtomicInteger(0)
val serverSocket=new ServerSocket(1024)
val executorPool=Executors.newCachedThreadPool()
println("准备连接...")
while(true){
val socket=serverSocket.accept()
print("start write data...")
executorPool.execute(new Runnable {
override def run(): Unit = {
while (true){
Thread.sleep(1000)
val writer = new PrintWriter(socket.getOutputStream)
writer.println("Word hello"+counter.getAndIncrement())
writer.flush()
}
}
})
Thread.sleep(2000)
}
Thread.sleep(100)
executorPool.shutdown()
}
}
结果:
-------------------------------------------
Time: 1563209290000 ms
-------------------------------------------
(hello2,1)
(hello3,1)
(Word,5)
(hello4,1)
(hello5,1)
(hello6,1)
-------------------------------------------
Time: 1563209295000 ms
-------------------------------------------
(hello10,1)
(Word,5)
(hello11,1)
(hello7,1)
(hello8,1)
(hello9,1)
...
网友评论