美文网首页
spark checkpoint 分析及用法

spark checkpoint 分析及用法

作者: 走在成长的道路上 | 来源:发表于2020-12-28 21:03 被阅读0次

    Spark Streamingcheckpoint 机制

    Spark Streaming 若需要 7 * 24 不间断的运行,必须对诸如系统错误, JVM 错误等程序逻辑无关的错误 (Failures) 导致 Driver 所在
    的节点错误,具备一定的非应用程序出错的容错性。Spark Streamingcheckpoint 机制便是为此设计,它将足够多的信息 checkpoint
    到某些具备容错性的存储系统如 hdfs 上,以便出错时能迅速恢复。

    有两种数据可以进行 checkpoint 保存:

    • Metadata Checkpoint

      配置信息: SparkConf , DStream 操作

    • Data Checkpoint

      checkpoint 来自有状态转化操作的 DStream

    即,Metadata Checkpoint 主要时从 driver 失败中恢复,而 Data Checkpoint 用于对有状态的操作进行 Checkpoint 处理。

    使用 Checkpoint

    RDDcheckpoint 一样,需要从 SparkStreaming 入口设置检查点,即通过 StreamingContext.checkpoint(...) 设置 checkpoint
    缓存路径,一般是缓存到 hdfs 上,同时, checkpoint 还需要满足两个条件:

    • 若第一次启动,将创建新的 StreamingContext
    • 若从失败中恢复,将从 checkpoint 中恢复上次的 StreamingContext, 恢复之后的 DAG 仅含有依赖关系,并非原来的线性关系

    因此,在创建 StreamingContext 是需要使用另一种方案 StreamingContext.getOrCreate(), 其中 getOrCreate() 源码如下:

    def getOrCreate(
      checkpointPath: String,
      creatingFunc: () => StreamingContext,
      hadoopConf: Configuration = SparkHadoopUtil.get.conf,
      createOnError: Boolean = false
    ): StreamingContext = {
    // 读取 checkpoint 数据
    val checkpointOption = CheckpointReader.read(
      checkpointPath, new SparkConf(), hadoopConf, createOnError)
    // 创建 StreamingContext 函数,不存在则需要使用 creatingFunc 函数创建新实例
    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
    }
    

    实例如下:

    // 创建 SparkConf 属性配置内容
    private val conf: SparkConf = new SparkConf()
                                .setMaster("local[*]")
                                .setAppName("SparkStreaming")
                                // 优雅的关闭程序,保证driver结束前处理完所有已接收的数据
                                .set("spark.streaming.stopGracefullyOnShutdown", "true")
    private val path = "checkpoint"
    
    // 创建测试用例
    def testCheckpoint(): Unit = {
        // 获取或创建 StreamingContext 实例
        // 必须将业务逻辑写在 `creatingFunc` 中,并返回 `StreamingContext`,
        // 若定义在外面,恢复操作意味着重新创建了重新创建一次 `StreamingContext` 
        // 因此会报异常 StateDStream@333398f has not been initialized
        val ssc = StreamingContext.getOrCreate(path, () => {
            // 创建 StreamingContext 新实例
            val ssc = new StreamingContext(conf, Seconds(5))
            // 设置 checkpoint 存储路径
            ssc.checkpoint(path)
            // 业务逻辑
            val wc = ssc.socketTextStream("127.0.0.1", 9999)
            .flatMap(_.split(" "))
            .map((_, 1))
            .updateStateByKey[Int]((seq: Seq[Int], op: Option[Int]) => Option(seq.sum + op.getOrElse(0)))
            // 设置检查点时间
            wc.checkpoint(Seconds(50))
            wc.print()
            ssc
        })
        // 启动 Streaming 逻辑
        ssc.start()
        ssc.awaitTermination()
    }
    

    在上述测试中,使用 nc -lk 9999 即可开启一个简易的 socket 服务器端,然后往控制台刷数据即可

    必须将业务逻辑写在 creatingFunc 中,并返回 StreamingContext,若定义在外面,恢复操作意味着重新创建了重新创建一次 StreamingContext
    org.apache.spark.streaming.dstream.StateDStream@333398f has not been initialized

    UpdateStateByKey(基于磁盘读写)MapWithState(基于磁盘存储+缓存)

    Spark Streaming 中状态管理函数包括updateStateBykey和mpaWithState,都是用来统计全局key的状态的变化的。它们以DStream中的
    数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。

    1. MapWithState 使用方式
    • 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
    • 指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,
      必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,
      则会报错。
    • checkpoint不会必须的
    1. UpdateStateByKey 使用方式
    • 首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加。
    • updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
    • updataStateByKey方法中updataFunc就要传入的参数,Seq[V]表示当前key对应的所有值,Option[S]是当前key的历史状态,返回的是新的封装的数据。
    1. 区别
    • updataeStateByKey
      可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,
      当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

    • mapWithState
      只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。
      这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
      重启时,如未使用 initialState 函数初始化值时,会导致初始化值为空, 有点类似 Redis 逻辑,其中也包含 timeout 逻辑

    Kafka Checkpoint 实现分析

    sparkstreaming 中对 kafka 数据 offset 相关的内容直接能保存到 checkpoint 数据中,这样当 streaming 任务非程序 bug 出现异常宕之后,
    可以通过 checkpoint 数据进行恢复状态

    private[spark] class DirectKafkaInputDStream[K, V](
        _ssc: StreamingContext,
        locationStrategy: LocationStrategy,
        consumerStrategy: ConsumerStrategy[K, V],
        ppc: PerPartitionConfig
      ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
      
      ...省略...
      
      override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
        logError("Kafka ConsumerRecord is not serializable. " +
          "Use .map to extract fields before calling .persist or .window")
        super.persist(newLevel)
      }
    
      ...省略...
      // 从实现了 DirectKafkaInputDStreamCheckpointData 来保存数据
      protected[streaming] override val checkpointData =
        new DirectKafkaInputDStreamCheckpointData
    
      // 定义 DStreamCheckpointData 数据内容
      private[streaming]
      class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
    
      ...省略...
    
        override def update(time: Time): Unit = {
          batchForTime.clear()
          generatedRDDs.foreach { kv =>
            // 将 KafkaRDD 中的 offsetRanges 记录到 generateRDD checkpoint 中
            val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
            batchForTime += kv._1 -> a
          }
        }
    
      ...省略...
      
        override def restore(): Unit = {
          batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
            // 将 generateRDD checkpoint 中的 offsetRanges 记录恢复为 KafkaRDD 实例
             logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
             generatedRDDs += t -> new KafkaRDD[K, V](
               context.sparkContext,
               executorKafkaParams,
               b.map(OffsetRange(_)),
               getPreferredHosts,
               // during restore, it's possible same partition will be consumed from multiple
               // threads, so do not use cache.
               false
             )
          }
        }
      }
    
    }
    

    spark.cleaner.referenceTracking.cleanCheckpoints=true 可以定期删除过期的 checkpoint 文件夹

    checkpoint 文件夹清理工作在 ContextCleaner 类中进行实现的。具体如下:

      /**
       * Clean up checkpoint files written to a reliable storage.
       * Locally checkpointed files are cleaned up separately through RDD cleanups.
       */
      def doCleanCheckpoint(rddId: Int): Unit = {
        try {
          logDebug("Cleaning rdd checkpoint data " + rddId)
          ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
          listeners.asScala.foreach(_.checkpointCleaned(rddId))
          logInfo("Cleaned rdd checkpoint data " + rddId)
        }
        catch {
          case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
        }
      }
    

    相关文章

      网友评论

          本文标题:spark checkpoint 分析及用法

          本文链接:https://www.haomeiwen.com/subject/nuotoktx.html