美文网首页
spark structedStreaming是如何实现容错的

spark structedStreaming是如何实现容错的

作者: hongshen | 来源:发表于2020-02-24 23:52 被阅读0次

    sss如何实现eoc的

    spark structed Streaming简称sss,它主要还是采用微批的模式提供端到端的eoc(exactly-once)语义,要实现eoc,需要3方面保证,一个是可以replay的source,二是框架提供作业状态的持久化能力,三是sinker要实现幂等

    DataSource

    DataSource要replayable,就是指数据源可以追踪当前读取的位置,并且能够从上次失败的位置重新消费数据
    这两点可以保证能够从持久化的状态中恢复任务,比如apache kafka和Amazon Kinesis,kafka消费可以commit offset,可以根据offset seek到指定的位置开始消费;
    如果是quickstart中的socket数据源类型,它就不能replay,也就无法实现eoc

    追踪数据处理的点位主要依赖spark提供的checkpoint机制,checkpoint保存的信息主要是当前批次的数据源的点位等元数据信息

    StreamingQueryManager的startQuery和createQuery方法,将checkpoint的位置传给StreamExecution对象
    StreamExecution初始化org.apache.spark.sql.execution.streaming.OffsetSeqLog,这就是文档中提到的wal日志,从名字就可以猜测它的功能是顺序写数据源的点位信息,类似于数据库的事务日志哦,它记录了已经处理的每一批数据的点位信息,当前批次N在处理完之前,就先把点位信息写到OffsetSeqLog,第N批的点位写进OffsetSeqLog意味着第N-1批数据已经正确的交给了sinker

    org.apache.spark.sql.execution.streaming.MicroBatchExecution#constructNextBatch()
    我们来看这个代码片段

    updateStatusMessage("Writing offsets to log")
    reportTimeTaken("walCommit") {
      assert(offsetLog.add(
        currentBatchId,
        availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
        s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
        logInfo(s"Committed offsets for batch $currentBatchId. " +
        s"Metadata ${offsetSeqMetadata.toString}")
     
      // NOTE: The following code is correct because runStream() processes exactly one
      // batch at a time. If we add pipeline parallelism (multiple batches in flight at
      // the same time), this cleanup logic will need to change.
     
      // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
      // sources to discard data from the previous batch.
      if (currentBatchId != 0) {
        val prevBatchOff = offsetLog.get(currentBatchId - 1)
        if (prevBatchOff.isDefined) {
          prevBatchOff.get.toStreamProgress(sources).foreach {
            case (src: Source, off) => src.commit(off)
            case (reader: MicroBatchReader, off) =>
              reader.commit(reader.deserializeOffset(off.json))
          }
        } else {
          throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
        }
      }
       
      // It is now safe to discard the metadata beyond the minimum number to retain.
      // Note that purge is exclusive, i.e. it purges everything before the target ID.
      if (minLogEntriesToMaintain < currentBatchId) {
        offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
        commitLog.purge(currentBatchId - minLogEntriesToMaintain)
      }
    }
    

    作业恢复的时候,如果offset checkpoint文件存在,那么sss会解析当前批次的id,它对应的要处理的offsets,以及已经commit的N-1批次的offsets,

    这时候sss再检查这个第N批有没有处理完毕,即看一下最后commit的offset是不是跟N批的offset相同,如果相同,那么就会执行N+1新批次的处理,比如上次最后执行到1102批,那么解析出来发现1102批处理完成了,那么批次从1103批开始执行
    否则重做1102批次

    Data sinks

    sinker要幂等,同时sss还提供一个commitLog,它用来记录所有的已经完成的batch的id,跟offsetLog一样,这俩都可以retension


    处理流程示意图

    trigger一批之后,总是先写offsetlog,然后处理,处理完之后写commitlog

    前面提到了offset的错误恢复,显然,会有可能会重复消费一批数据进行处理,导致到达sinker的数据出现重复,eoc因此要求sinker必须实现幂等,sinker自己去重,比如写key -value类型的数据库,redis等,重复的数据并不影响结果

    sss的状态管理

    状态管理仅仅给应用提供了at-least once的支持,要实现eoc还需要sinker是幂等的,这跟flink的数据加barrier对齐后checkpoint不同,flink等于是框架层面实现了eoc,当然flink也要求sinker必须是幂等的,否则还是有可能有重复数据,比如kafka-fllink-kafka,因为写给kafka sinker的那一批结果,虽然不用flink重新计算,但是kafka如果没有开启事务,那么夸session的producer无法保证幂等,也就是不知道要写的结果数据,到底成功写进去了几条。

    spark的状态管理就是指持久化保存在checkpointLocation位置的那些数据

    包括四类
    1、数据源的信息,因为支持多种数据源,所有要知道你用了什么数据源
    2、数据源的offsetLog
    3、commitLog
    4、应用程序内部的状态(统计状态等,用户作业逻辑的状态,可能会非常多,非常大,比如你groupby url之类的试试)

    前3个状态信息,都是文本格式保存的,带有版本信息,防止被不同版本的spark处理,破坏元数据等

    故障恢复的流程

    正常处理流程是先commit offset,再处理,被sinker完全处理后 再写commitlog
    这种情况下,在恢复的时候,加载最后commit的offsetLog,然后通过commitLog判断那一批有没有被正确处理

    处理了,那么从新的offset开始处理,如果没有,那么,就要重新做

    应该是按照微批做原子更新的,如果本批次没有完成成功写入commitLog,是可以回滚的

    状态管理跟flink的区别

    整体上sss的这种状态管理跟flink类似,spark因为是微批,那么就可以做批的原子状态管理,flink因为是连续的流,所以必须用barrier机制同步各个算子的状态,也相当于利用barrier实现了微批,只不过flink的微批是“异步的”,就是不用等你这一批执行完,就可以不断的执行下一批,而sss的是“同步的”,显然异步的似乎效率更高一些,但是checkpoint如果太频繁,频繁等待barrier一致的话,也会有很多算子干等着啥也不干,协调效率可能需要根据场景自行调优

    相关文章

      网友评论

          本文标题:spark structedStreaming是如何实现容错的

          本文链接:https://www.haomeiwen.com/subject/rexjqhtx.html