1. 程序入口
// 初始化 StreamingContext
SparkConf conf = new SparkConf().setAppName("SparkStreaming_demo")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(30));
// 构建处理链
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exampleGroup");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("exampleTopic");
JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> transformDStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
});
return tempRdd;
}
});
JavaDStream<String> filterDStream = transformDStream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
return StringUtils.isNotBlank(v1);
}
});
filterDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> javaRDD) throws Exception {
javaRDD.saveAsTextFile("/home/example/result/");
}
});
// 启动运行
streamingContext.start();
streamingContext.awaitTermination();
本文主要看SparkStreaming链式处理过程的构建。
2. 进入源码
2.1 创建 InputDStream
- 进入
org.apache.spark.streaming.kafka010.KafkaUtils.scala
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): JavaInputDStream[ConsumerRecord[K, V]] = {
new JavaInputDStream(
createDirectStream[K, V](
jssc.ssc, locationStrategy, consumerStrategy))
}
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
}
createDirectStream
时会直接 new DirectKafkaInputDStream 出来。
- 进入
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.scala
private[spark] class DirectKafkaInputDStream[K, V](
_ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
}
private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
......
}
在创建DirectKafkaInputDStream 时,会:
- 创建 rateController,此时会读取配置
spark.streaming.backpressure.enabled
看反压机制是否打开, 如果打开了则构建一个新的速率推算器作为 rateController; - 读取配置
spark.streaming.kafka.maxRatePerPartition
看是否配置了各分区最大消费速率,如果配置了则配合上面的 rateController 一起来控制 offset 的变化,从而控制每个批次拉取的数据量; - 等等
DirectKafkaInputDStream 是 InputDStream 的子类,因此会先构造 InputDStream。
- 进入
org.apache.spark.streaming.dstream.InputDStream.scala
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
ssc.graph.addInputStream(this)
val id = ssc.getNewInputStreamId()
......
}
此处将构造的 DirectKafkaInputDStream 加入到之前初始化完毕的 StreamingContext 的 DStreamGraph 的 inputStreams 中。
2.2 transform 操作
- 进入
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.scala
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassTag[U] = fakeClassTag
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
}
- 再进入
org.apache.spark.streaming.dstream.DStream.scala
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], _: Time) => cleanedF(r))
}
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
}
transform 操作最终会创建 TransformedDStream。
- 进入
org.apache.spark.streaming.dstream.TransformedDStream.scala
private[streaming]
class TransformedDStream[U: ClassTag] (
parents: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[U]
) extends DStream[U](parents.head.ssc) {
override def dependencies: List[DStream[_]] = parents.toList
override def slideDuration: Duration = parents.head.slideDuration
......
}
创建 TransformedDStream 时会设置 dependencies,即:
dependencies = parents.toList
,而 parents 即为上面new TransformedDStream[U](Seq(this), realTransformFunc)
中的 Seq(this),而 this 就是 inputDStream.transform
中调用 transform 操作的 inputDStream。
至此,TransformedDStream 通过内部声明的 dependencies 与上一链的 DirectKafkaInputDStream 链接在了一起。
2.3 filter 操作
与 transform 操作类似,filter 操作会创建 FilteredDStream,在 FilteredDStream 内部同样使用 dependencies 将 自身与 上一链的 TransformedDStream 链接在了一起。
- 进入
org.apache.spark.streaming.dstream.FilteredDStream.scala
private[streaming]
class FilteredDStream[T: ClassTag](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
......
}
2.4 foreachRDD 操作
- 进入
org.apache.spark.streaming.dstream.DStream.scala
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
foreachRDD 操作中:
- 创建 ForEachDStream
- 调用 register() 注册
2.4.1 创建 ForEachDStream
- 进入
org.apache.spark.streaming.dstream.ForEachDStream.scala
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
......
}
与 前面类似,在 ForEachDStream 内部同样使用 dependencies 将 自身与 上一链的 FilteredDStream 链接在了一起。
2.4.2 调用register()
注册
- 进入
org.apache.spark.streaming.dstream.DStream.scala
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
此处将刚刚创建的 ForEachDStream 加入 DStreamGraph 的 outputStreams 中。
注意:
SparkStreaming 所有的 Output 操作都会生成 ForEachDStream。
即所有的 Output 操作都会注册到 DStreamGraph 的 outputStreams 中。
例如:print 操作最后调用的就是 foreachRDD
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
3. 总结
-
创建 InputDStream,并加入到 DStreamGraph 的 inputStreams 中。
-
SparkStreaming的每个算子都会产生相应的 DStream(例如 FilteredDStream、ForEachDStream等)。
-
这些 DStream 内部使用 dependencies 来链接上一链的 DStream。
-
利用一个一个 DStream 的链接,构建出了 SparkStreaming 的处理链。
-
所有的 Output 操作都调用 foreachRDD 算子来运算,foreachRDD 算子将产生 ForEachDStream,并将 ForEachDStream 加入到 DStreamGraph 的 outputStreams 中。
网友评论