-
当程序首次启动,其将创建一个新的StreamingContext,设置所有的流并调用start()。
-
当程序在失效后重启,其将依据检查点目录的检查点数据重新创建一个StreamingContext。 通过使用StraemingContext.getOrCreate很容易获得这个性能。
-
使用kafka direct api ,driver端会lauch jobs useing offsetranges 在executor,然后executor端会直接使用
offsetrange读取kafka 指定分区和指定偏移量,数据会被缓存在executor端(这里面还存在executor调度在kafka分区leader上的问题,参数LocationStrategies.PreferConsistent) -
只要开启checkpoint,HasoffsetRange中的topic和offset都会被存储,便于从checkpoint中,错误恢复
-
task端和driver端容错,因为kafka是可重放数据源,所以只要重新driver端从checkpoint重新启动,就会重新消费数据,如果在sink端保证实现事物,则可以实现恰好一次语义,默认实现最少一次语义
ssc.checkpoint("hdfs://...")
# 创建和设置一个新的StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # 设置检查点目录
return ssc
# 从检查点数据中获取StreamingContext或者重新创建一个
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# 在需要完成的context上做额外的配置
# 无论其有没有启动
context ...
# 启动context
context.start()
# 通常,检查点设置间隔是5-10个DStream的滑动间隔
# minbatch 3s
dstream.checkpoint(30s)
contaxt.awaitTermination()
driver 程序容错
驱动器程序的容错要求我们以特殊的方式创建 StreamingContext。我们需要把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同,应该使用 StreamingContext.getOrCreate() 函数。
配置过程如下:
1、 启动Driver自动重启功能
yarn:设置yarn.resourcemanager.am.max-attempts
或者spark.yarn.maxAppAttempts
2、 设置checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
3、支持从checkpoint中重启配置
val ssc = new StreamingContext
ssc.checkpoint(checkpointDirectory)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))```
网友评论