Spark Streaming
的 checkpoint
机制
Spark Streaming
若需要 7 * 24
不间断的运行,必须对诸如系统错误, JVM
错误等程序逻辑无关的错误 (Failures
) 导致 Driver
所在
的节点错误,具备一定的非应用程序出错的容错性。Spark Streaming
的 checkpoint
机制便是为此设计,它将足够多的信息 checkpoint
到某些具备容错性的存储系统如 hdfs
上,以便出错时能迅速恢复。
有两种数据可以进行 checkpoint
保存:
-
Metadata Checkpoint
配置信息:
SparkConf
,DStream
操作 -
Data Checkpoint
checkpoint
来自有状态转化操作的DStream
即,Metadata Checkpoint
主要时从 driver
失败中恢复,而 Data Checkpoint
用于对有状态的操作进行 Checkpoint
处理。
使用 Checkpoint
与 RDD
的 checkpoint
一样,需要从 SparkStreaming
入口设置检查点,即通过 StreamingContext.checkpoint(...)
设置 checkpoint
缓存路径,一般是缓存到 hdfs
上,同时, checkpoint
还需要满足两个条件:
- 若第一次启动,将创建新的
StreamingContext
- 若从失败中恢复,将从
checkpoint
中恢复上次的StreamingContext
, 恢复之后的DAG
仅含有依赖关系,并非原来的线性关系
因此,在创建 StreamingContext
是需要使用另一种方案 StreamingContext.getOrCreate()
, 其中 getOrCreate()
源码如下:
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
// 读取 checkpoint 数据
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
// 创建 StreamingContext 函数,不存在则需要使用 creatingFunc 函数创建新实例
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
实例如下:
// 创建 SparkConf 属性配置内容
private val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkStreaming")
// 优雅的关闭程序,保证driver结束前处理完所有已接收的数据
.set("spark.streaming.stopGracefullyOnShutdown", "true")
private val path = "checkpoint"
// 创建测试用例
def testCheckpoint(): Unit = {
// 获取或创建 StreamingContext 实例
// 必须将业务逻辑写在 `creatingFunc` 中,并返回 `StreamingContext`,
// 若定义在外面,恢复操作意味着重新创建了重新创建一次 `StreamingContext`
// 因此会报异常 StateDStream@333398f has not been initialized
val ssc = StreamingContext.getOrCreate(path, () => {
// 创建 StreamingContext 新实例
val ssc = new StreamingContext(conf, Seconds(5))
// 设置 checkpoint 存储路径
ssc.checkpoint(path)
// 业务逻辑
val wc = ssc.socketTextStream("127.0.0.1", 9999)
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey[Int]((seq: Seq[Int], op: Option[Int]) => Option(seq.sum + op.getOrElse(0)))
// 设置检查点时间
wc.checkpoint(Seconds(50))
wc.print()
ssc
})
// 启动 Streaming 逻辑
ssc.start()
ssc.awaitTermination()
}
注
在上述测试中,使用 nc -lk 9999
即可开启一个简易的 socket
服务器端,然后往控制台刷数据即可
注
必须将业务逻辑写在 creatingFunc
中,并返回 StreamingContext
,若定义在外面,恢复操作意味着重新创建了重新创建一次 StreamingContext
报 org.apache.spark.streaming.dstream.StateDStream@333398f has not been initialized
UpdateStateByKey(基于磁盘读写)
与 MapWithState(基于磁盘存储+缓存)
Spark Streaming 中状态管理函数包括updateStateBykey和mpaWithState,都是用来统计全局key的状态的变化的。它们以DStream中的
数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。
-
MapWithState
使用方式
- 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
- 指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,
必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,
则会报错。 - checkpoint不会必须的
-
UpdateStateByKey
使用方式
- 首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加。
- updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
- updataStateByKey方法中updataFunc就要传入的参数,Seq[V]表示当前key对应的所有值,Option[S]是当前key的历史状态,返回的是新的封装的数据。
- 区别
-
updataeStateByKey
可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,
当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。 -
mapWithState
只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。
这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
重启时,如未使用 initialState 函数初始化值时,会导致初始化值为空, 有点类似 Redis 逻辑,其中也包含 timeout 逻辑
Kafka
Checkpoint 实现分析
在 sparkstreaming
中对 kafka
数据 offset 相关的内容直接能保存到 checkpoint
数据中,这样当 streaming 任务非程序 bug 出现异常宕之后,
可以通过 checkpoint
数据进行恢复状态
private[spark] class DirectKafkaInputDStream[K, V](
_ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
ppc: PerPartitionConfig
) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
...省略...
override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
logError("Kafka ConsumerRecord is not serializable. " +
"Use .map to extract fields before calling .persist or .window")
super.persist(newLevel)
}
...省略...
// 从实现了 DirectKafkaInputDStreamCheckpointData 来保存数据
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
// 定义 DStreamCheckpointData 数据内容
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
...省略...
override def update(time: Time): Unit = {
batchForTime.clear()
generatedRDDs.foreach { kv =>
// 将 KafkaRDD 中的 offsetRanges 记录到 generateRDD checkpoint 中
val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
}
}
...省略...
override def restore(): Unit = {
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
// 将 generateRDD checkpoint 中的 offsetRanges 记录恢复为 KafkaRDD 实例
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V](
context.sparkContext,
executorKafkaParams,
b.map(OffsetRange(_)),
getPreferredHosts,
// during restore, it's possible same partition will be consumed from multiple
// threads, so do not use cache.
false
)
}
}
}
}
注
spark.cleaner.referenceTracking.cleanCheckpoints=true
可以定期删除过期的 checkpoint
文件夹
注
checkpoint
文件夹清理工作在 ContextCleaner
类中进行实现的。具体如下:
/**
* Clean up checkpoint files written to a reliable storage.
* Locally checkpointed files are cleaned up separately through RDD cleanups.
*/
def doCleanCheckpoint(rddId: Int): Unit = {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
listeners.asScala.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
}
}
网友评论