Spark运维

作者: Mr_Qifei | 来源:发表于2019-06-14 13:27 被阅读0次

    spark on yarn运维

    场景:对于集群故障和集群配置调整可能对spark实时任务造成的影响及所需采取措施进行记录,并制定对应解决方案。
    方案总结:针对下文总结的实时任务存在的隐患,可对hadoop集群配置进行以下调整优化(优化方案需先在测试集群上测试):

    • 增加yarn AM进程的容错重试次数,默认只有一次,调大后可增加长期在yarn上运行的任务的稳定性。
    • 保留RM、NM进程的运行时状态数据,可在rm和nm进程遇故障或者需要重启时使yarn集群稳定过渡。

    一、磁盘故障对spark任务的影响

    1、如果是application master所在机器磁盘发生故障:

    • 取决于spark application master是否用到该磁盘存储数据:如果没有用到则对spark任务无影响。
    • 如果am进程用到故障磁盘,则会触发Yarn容错机制,ResourceManager会启动新的AM进程,并重新启动所有executor进程,所有旧的executor全部停止,只保留日志。

    由于目前yarn参数默认配置,am进程只有一次容错重启的机会,新am进程再出问题则spark任务会down掉。

    2、如果是普通的executor所在机器磁盘发生故障:

    同样取决于该executor是否用到该磁盘存储数据,如果没用到则无影响。
    如果用到该磁盘,则该节点executor日志持续报错,但executor进程还在;spark任务其他节点上executor进程仍正常执行。

    针对磁盘故障问题的可用措施:由于目前已经对磁盘健康状态做了监控,但是由于spark任务通常需要长期运行,不可避免会遇到磁盘故障等硬件或者其他问题,可考虑将am容错重试进行调整,增加am容错次数。

    二、NodeManager进程故障对spark任务的影响

    1、application master所在nm进程故障:

    • spark任务进程一定会down!
      措施:需立即重启spark任务。

    2、executor所在NM进程故障:

    • 该NM进程所在节点的executor进程down,其他NM节点executor进程正常执行!
    • 超过120秒没有响应,被ResourceManager和AM进程确认 Slave lost 和 Lost executor。
    • 10分钟后在健康的NM节点上启动新的Executor进程。
      不影响spark实时任务正常工作。

    三、DataNode进程重启对spark任务的影响

    • 如果spark实时任务不需要用到HDFS上的数据,则不受影响。
    • 如果Spark任务在读取hdfs上数据,可能会导致部分executor出错,严重时会使spark整体进程down掉。

    四、ResourceManager重启对spark任务影响

    场景:如果增加RM HA配置,那么修改完配置需要重启rm和nm进程。此时对线上执行的任务有何影响?

    • RM进程down掉后,spark实时任务正常状态下不受影响继续运行(除非触发容错机制或者其他需要跟RM交互的行为时任务将受到影响)。
    • 20分钟后,由于nm进程始终无法与rm进程取得连接,将停掉正在执行的任务和自身NM进程。
    • RM进程上线时,由于之前RM进程数据丢失,所有线上任务都未在新RM进程中注册,会全部被RM进程kill掉。

    目前如果重启ResourceManager进程,线上spark不可避免将被影响。

    参考建议:修改yarn配置,存储RM、NM进程运行时状态等,重启时减小对线上任务产生的影响。



    开发及运维

    1、spark实时任务运维

    如遇yarn集群故障,集群恢复后一般重启spark任务即可。

    2、Spark任务开发注意事项及优化点

    1. 调整spark任务的资源配置并启动时(如executor数量、core数量等),需先将checkpoint删除,否则任务启动后仍会按照旧的资源配置申请资源。
    2. 复用RDD、使用高性能算子、广播大变量、RDD持久化…
    3. 使用高性能序列化框架Kryo优化序列化性能,比spark默认java序列化机制性能高10倍。
    4. 分配合理的资源参数:num-executors、executor-memory、executor-cores、spark.default.parallelism、spark.storage.memoryFraction、spark.shuffle.memoryFraction……
    5. spark任务数据倾斜优化、shuffle参数优化……

    具体优化细节不展开记录,可参考网上spark优化相关文章:https://www.jianshu.com/p/67606a11415b

    3、问题记录

    spark streaming重启时候,如果不清空checkpoint启动就会报null point的错误,并启动失败。

    原因:这个实时任务用到Spark SQL中的sparksession来操作sql, 但是sparksession是不能被序列化的,所以不能在checkpoint中保存。重启后操作sql时候会因为sparksession在checkpoint反序列为空而报错,所以需要sparksession在foreachRdd中定义,每次执行都去重新获取。

    解决:

     lines.foreachRDD(rdd => {
          //获取单实例SparkSession
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
          import spark.implicits._
          
          ……
     }
    

    再额外定义一个获取sparksession的类。

    /** Lazily instantiated singleton instance of SparkSession */
    object SparkSessionSingleton {
    
      @transient  private var instance: SparkSession = _
    
      def getInstance(sparkConf: SparkConf): SparkSession = {
        if (instance == null) {
          instance = SparkSession
            .builder
            .config(sparkConf)
            .getOrCreate()
        }
        instance
      }
    }
    

    相关文章

      网友评论

        本文标题:Spark运维

        本文链接:https://www.haomeiwen.com/subject/jpwufctx.html