业务相关代码已接近尾声,接下来要做下维护相关的工作,不可不提的就是spark streaming 优雅退出。
集群环境&参数
spark: 2.2
kafka: 0.10.1
mode: spark on yarn cluster
offset: enable.auto.commit=false
driver kill
ss -tanlp | grep {driver port }|awk '{print $6}'|awk -F, '{print $2}'
获取到driver pid进程号;然后 kill pid(不要使用kill -9)
spark 提供的参数
设置:spark.streaming.stopGracefullyOnShutdown=true
实验效果:当前batch未进行完就直接退出, 有重复消费数据的问题;
分析: 该参数在yarn cluster mode 下不起作用。 please refer:https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/
Add Shutdown Hook
add hook code:
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
logger.warn("start to stop gracefully .....................");
streamContext.stop(true,true);
logger.warn("right now to stop gracefully .....................");
}));
实验效果: 会等待当前batch 处理完毕;并且不再接受新的input;能达到效果
问 题: 但是面临的一个问题最后一个batch能正常处理,但是由于 consumer 已经killed; 该 kafka topic 在 当前 group下发生啦reblance;导致偏移量提交失败。
分 析: 提交偏移量代码如下,提交是一个异步的过程
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
System.err.println("onComplete:" + offsets.size());
System.err.println("is failed " + exception);
System.err.println("commit flag inner over .........." + offsetRanges[0].fromOffset() + " " + offsetRanges[0].untilOffset());
});
System.err.println("commit flag outer over .........." + offsetRanges[0].fromOffset() +" "+offsetRanges[0].untilOffset());
实验结果多次发现:回调函数内未有日志打印, 相反 "commit flag outer over" 该日志片段却有在打印。导致下次重新启动;当前batch又被重新处理
改 进: 坚持以本次任务本次解决的宗旨,尝试在 hook code 中 再次提交失败的偏移量。
使用 原生 Kafka consumer API,依据缓存起来的 offsetRange[] 信息,再次提交偏移量;有以下方案:
params:max.poll.records=1
1:enable.auto.commit=false;consumer.commitSync();失败后异步提交:consumer.commitAsync(); 主要是kafka topic有在做 reblance的情况;
2:enable.auto.commit=true; 自动提交偏移量
目前使用使用的方案1;经过多次测试;可以满足需求
add stop maker
依赖于hdfs设置maker file,单独线程扫描;有该文件就使用stop(true,true)关闭服务
网友评论