1. 程序入口
// 初始化 StreamingContext
SparkConf conf = new SparkConf().setAppName("SparkStreaming_demo")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(30));
// 构建处理链
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exampleGroup");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("exampleTopic");
JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> transformDStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
});
return tempRdd;
}
});
JavaDStream<String> filterDStream = transformDStream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
return StringUtils.isNotBlank(v1);
}
});
filterDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> javaRDD) throws Exception {
javaRDD.saveAsTextFile("/home/example/result/");
}
});
// 启动运行
streamingContext.start();
streamingContext.awaitTermination();
本文主要看StreamingContext初始化过程。
2. 进入源码
- 进入
org.apache.spark.streaming.StreamingContext.scala
class StreamingContext private[streaming] (
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
) extends Logging {
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
private val nextInputStreamId = new AtomicInteger(0)
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(_cp.checkpointDir)
_cp.checkpointDir
} else {
null
}
}
private[streaming] val scheduler = new JobScheduler(this)
private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}
private var state: StreamingContextState = INITIALIZED
......
}
在初始化 StreamingContext 时,会:
- 创建 DStreamGraph,设置 BatchDuration 为 SparkStreaming 任务的批次时间;
- 设置 checkpointDir;
- 创建 JobScheduler;
- 创建 UITab页;
- 初始化状态为 INITIALIZED;
- 等等。
- 再进入
org.apache.spark.streaming.DStreamGraph.scala
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
......
}
创建 DStreamGraph 时,会声明 inputStreams 和 outputStreams 等。
- 再进入
org.apache.spark.streaming.scheduler.JobScheduler.scala
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
var inputInfoTracker: InputInfoTracker = null
private var executorAllocationManager: Option[ExecutorAllocationManager] = None
private var eventLoop: EventLoop[JobSchedulerEvent] = null
......
}
创建 JobScheduler 时,会声明 jobSets、numConcurrentJobs、jobExecutor、jobGenerator 等。
3. 总结
初始化 StreamingContext 时,会:
-
创建 DStreamGraph,声明 inputStreams 和 outputStreams 等
-
创建 JobScheduler,声明 jobSets、numConcurrentJobs、jobExecutor、jobGenerator 等
网友评论