在实时数据流处理的场景下,可能会遇到各种与程序逻辑无关的故障,而导致运行程序中断或数据损失的可能性,因此在实际场景当中,很重要的一个点就是Checkpoint机制,这在很多流计算引擎当中都有。今天的大数据开发学习分享,我们就来讲讲Spark Streaming Checkpoint机制。
Checkpoint(检查点)
Spark Streaming的Checkpoint,有两种类型的检查点——
元数据检查点:
将定义流计算的信息保存到HDFS等容错存储系统中。这用于从运行流应用程序的驱动程序的节点的故障中恢复。元数据包括:
配置-用于创建流应用程序的配置。
DStream操作-定义流应用程序的DStream操作集。
不完整的批次-作业对列尚未完成的批次。
数据检查点:
将生成的RDD保存到可靠的存储中。在某些状态转换中对数据进行检查点是有必要的,这些转换将多个批次中的数据合并在一起。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖项链的长度随着时间而不断增加。为了避免恢复时间的无限增加(与依赖关系成正比)。状态转换的中间RDD定期检查点到可靠的存储系统中(例如HDFS),以切断依赖链。
总而言之,从驱动程序故障中恢复时,主要需要元数据检查点,而如果使用有状态转换,则即使是基本功能,也需要数据或RDD检查点。
何时启用Checkpoint?
通常来说,必须为具有以下任何要求的应用程序启用检查点:
有状态转换的用法:如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow(带有反函数),则必须提供检查点目录以允许定期的RDD检查点。
从运行应用程序的驱动程序故障中恢复:元数据检查点用于恢复的进度信息。
请注意,没有上述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,从驱动程序故障中恢复也将是部分的(某些已接收但未处理的数据可能会丢失)。这通常是可以接受的,并且许多都以这种方式运行Spark Streaming应用程序。预计将来会改善对非Hadoop环境的支持。
如何配置Checkpoint?
可以通过在容错,可靠的文件系统(例如HDFS,S3等)中设置目录来启用检查点,将检查点信息保存到该目录中。通过使用streamingContext.checkpoint(checkpointDirectory)完成。此外,如果要使应用程序从驱动程序故障中恢复,则应重写流应用程序以具有以下行为:
程序首次启动时,它将创建一个新的StreamingContext,设置所有流,然后调用start()。
失败后重新启动程序时,它将根据检查点目录中的检查点数据重新创建StreamingContext。
通过使用StreamingContext.getOrCreate,可以简单完成。用法如下:
//Function to create and setup a new StreamingContext
def functionToCreateContext():StreamingContext={
val ssc=new StreamingContext(...)//new context
val lines=ssc.socketTextStream(...)//create DStreams
...
ssc.checkpoint(checkpointDirectory)//set checkpoint directory
ssc
}
//Get StreamingContext from checkpoint data or create a new one
val context=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)
//Do additional setup on context that needs to be done,
//irrespective of whether it is being started or restarted
context....
//Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory目录存在,则将根据检查点数据重新创建上下文。如果该目录不存在(即首次运行),则将调用函数functionToCreateContext来创建新上下文并设置DStreams。
除了使用getOrCreate之外,还需要确保驱动程序进程在发生故障时自动重新启动。这只能由用于运行应用程序的部署基础结构来完成。
注意,RDD的检查点会导致保存到可靠存储系统的成本。这可能会导致RDD获得检查点的那些批次的处理时间增加。因此,需要谨慎设置检查点的间隔时间。在小批次(例如间隔时间为1秒)时,每个批次的检查点可能会大大降低操作吞吐量。相反,检查点太少会导致血统和任务规模增加,这可能会产生不利影响。
对于需要RDD检查点的有状态转换,默认间隔为批处理间隔的倍数,至少应为10秒。可以使用dstream.checkpoint(checkpointInterval)进行设置。通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的尝试设置。
关于大数据开发学习,Spark Streaming Checkpoint机制,以上就为大家做了简单的介绍了。Spark Streaming作为非常流行的流计算引擎之一,Checkpoint机制是重要的保障机制,学习当中须注意。
网友评论