Spark Streaming是一个实时流处理框架,实时流处理产生的背景是时效性高,数据量大,个人认为从严格意义上讲,Spark Streaming并不能算是实时流处理,只不过批次可以设置的特别小,接近实时而已,目前比较流行的实时流处理框架有:
- Apache Storm (storm.apache.org)
- Apache Spark Streaming (spark.apache.org)
- IBM Stream
- Yahoo!S4
- LinkedIn Kafka (kafka.apache.org)
- flink (flink.apache.org)
实时流处理架构与技术选型
Spark Streaming是从Spark Core上扩展的一个可扩展的、高吞吐量的、容错的实时流处理框架,数据来源可以是Kafka、Flume、Kinesis、TCP sockets等,处理时可以使用map、reduce、 join、window等复杂的算子表达式,最终的处理结果可以写到filesystem(HDFS)、databases(mysql)等,而且还可以把Spark的机器学习和图计算应用到这些流数据上。

工作原理是:Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,
然后把小的数据块传给Spark Engine处理。

Spark Streaming提供了一个高级别的抽象概念叫做DStream(discretized stream),DStreaming表示的是持续的一系列的RDD(Internally, a DStream is represented by a continuous series of RDDs,Each RDD in a DStream contains data from a certain interval),对DStream操作算子,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所构成的。关于DStream还有一点,就是对于需要receiver的起码需要两个线程,一个线程用于接收数据,一个线程用于处理数据(Every input DStream (except file stream, discussed later in this section) is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing.)
同Spark SQL使用SparkSession作为入口点类似,Spark Streaming使用StreamingContext作为程序的入口点,怎样得到一个StreamingContext呢,可以通过如下两种方式:
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
val ssc2 = new StreamingContext(sc, Seconds(1)) //sc是SparkContext
其中Seconds(1)表示batch interval,即多久作为一个批次,设置为1表示1秒为一个批次,这个可以根据你的应用程序需求的延迟要求以及集群可用的资源情况来设置(The batch interval must be set based on the latency requirements of your application and available cluster resources)
在StreamingContext源代码中可以找到如下两个构造方法:
def this(sparkContext: SparkContext, batchDuration: Duration) = { // 1
this(sparkContext, null, batchDuration)
}
def this(conf: SparkConf, batchDuration: Duration) = { //2
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
可见当你使用构造方法2传递SparkConf这个参数进去时,底层还是创建一个SparkContext对象,然后调用1那个构造方法。
有了StreamingContext就可以做一些事情了(摘自官网):
- Define the input sources by creating input DStreams.
- Define the streaming computations by applying transformation and output operations to DStreams.
- Start receiving data and processing it using streamingContext.start().
- Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
- The processing can be manually stopped using streamingContext.stop().
这里有几点需要注意: - Once a context has been started, no new streaming computations can be set up or added to it.
- Once a context has been stopped, it cannot be restarted.
- Only one StreamingContext can be active in a JVM at the same time.
- stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
- A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
-
streaming sources
Spark Streaming提供了两类内建的数据源:
- 基础数据源: 比如 file systems, socket connections等
- 高级数据源: Kafka, Flume, Kinesis等
不管是哪类数据源,只要是基于receiver的(比如sockets, Kafka, Flume),需要注意以下两点:
- 如果是在本地运行,不要使用 “local” 或者“local[1]”,否则的话只有一个线程用于执行任务,该线程会用于接收数据,那么就没有线程用于处理接收到的数据,使用本地运行使用local[n]时,n应该大于receiver的数量
- 如果是在生产环境(集群)下运行,分配给Spark Streaming Application的core的数量也要大于receiver的数量,否则也是,只能接收数据,没有core去处理数据。
网友评论