spark on yarn运维
场景:对于集群故障和集群配置调整可能对spark实时任务造成的影响及所需采取措施进行记录,并制定对应解决方案。
方案总结:针对下文总结的实时任务存在的隐患,可对hadoop集群配置进行以下调整优化(优化方案需先在测试集群上测试):
- 增加yarn AM进程的容错重试次数,默认只有一次,调大后可增加长期在yarn上运行的任务的稳定性。
- 保留RM、NM进程的运行时状态数据,可在rm和nm进程遇故障或者需要重启时使yarn集群稳定过渡。
一、磁盘故障对spark任务的影响
1、如果是application master所在机器磁盘发生故障:
- 取决于spark application master是否用到该磁盘存储数据:如果没有用到则对spark任务无影响。
- 如果am进程用到故障磁盘,则会触发Yarn容错机制,ResourceManager会启动新的AM进程,并重新启动所有executor进程,所有旧的executor全部停止,只保留日志。
由于目前yarn参数默认配置,am进程只有一次容错重启的机会,新am进程再出问题则spark任务会down掉。
2、如果是普通的executor所在机器磁盘发生故障:
同样取决于该executor是否用到该磁盘存储数据,如果没用到则无影响。
如果用到该磁盘,则该节点executor日志持续报错,但executor进程还在;spark任务其他节点上executor进程仍正常执行。
针对磁盘故障问题的可用措施:由于目前已经对磁盘健康状态做了监控,但是由于spark任务通常需要长期运行,不可避免会遇到磁盘故障等硬件或者其他问题,可考虑将am容错重试进行调整,增加am容错次数。
二、NodeManager进程故障对spark任务的影响
1、application master所在nm进程故障:
- spark任务进程一定会down!
措施:需立即重启spark任务。
2、executor所在NM进程故障:
- 该NM进程所在节点的executor进程down,其他NM节点executor进程正常执行!
- 超过120秒没有响应,被ResourceManager和AM进程确认 Slave lost 和 Lost executor。
- 10分钟后在健康的NM节点上启动新的Executor进程。
不影响spark实时任务正常工作。
三、DataNode进程重启对spark任务的影响
- 如果spark实时任务不需要用到HDFS上的数据,则不受影响。
- 如果Spark任务在读取hdfs上数据,可能会导致部分executor出错,严重时会使spark整体进程down掉。
四、ResourceManager重启对spark任务影响
场景:如果增加RM HA配置,那么修改完配置需要重启rm和nm进程。此时对线上执行的任务有何影响?
- RM进程down掉后,spark实时任务正常状态下不受影响继续运行(除非触发容错机制或者其他需要跟RM交互的行为时任务将受到影响)。
- 20分钟后,由于nm进程始终无法与rm进程取得连接,将停掉正在执行的任务和自身NM进程。
- RM进程上线时,由于之前RM进程数据丢失,所有线上任务都未在新RM进程中注册,会全部被RM进程kill掉。
目前如果重启ResourceManager进程,线上spark不可避免将被影响。
参考建议:修改yarn配置,存储RM、NM进程运行时状态等,重启时减小对线上任务产生的影响。
开发及运维
1、spark实时任务运维
如遇yarn集群故障,集群恢复后一般重启spark任务即可。
2、Spark任务开发注意事项及优化点
- 调整spark任务的资源配置并启动时(如executor数量、core数量等),需先将checkpoint删除,否则任务启动后仍会按照旧的资源配置申请资源。
- 复用RDD、使用高性能算子、广播大变量、RDD持久化…
- 使用高性能序列化框架Kryo优化序列化性能,比spark默认java序列化机制性能高10倍。
- 分配合理的资源参数:num-executors、executor-memory、executor-cores、spark.default.parallelism、spark.storage.memoryFraction、spark.shuffle.memoryFraction……
- spark任务数据倾斜优化、shuffle参数优化……
具体优化细节不展开记录,可参考网上spark优化相关文章:https://www.jianshu.com/p/67606a11415b
3、问题记录
spark streaming重启时候,如果不清空checkpoint启动就会报null point的错误,并启动失败。
原因:这个实时任务用到Spark SQL中的sparksession来操作sql, 但是sparksession是不能被序列化的,所以不能在checkpoint中保存。重启后操作sql时候会因为sparksession在checkpoint反序列为空而报错,所以需要sparksession在foreachRdd中定义,每次执行都去重新获取。
解决:
lines.foreachRDD(rdd => {
//获取单实例SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
……
}
再额外定义一个获取sparksession的类。
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
网友评论