美文网首页
优雅关闭spark streaming job填坑之路

优雅关闭spark streaming job填坑之路

作者: A_You | 来源:发表于2018-11-16 17:32 被阅读0次

    业务相关代码已接近尾声,接下来要做下维护相关的工作,不可不提的就是spark streaming 优雅退出。

    集群环境&参数

       spark:  2.2
       kafka:  0.10.1
        mode:  spark on yarn cluster 
       offset: enable.auto.commit=false 
    

    driver kill

        ss -tanlp |  grep {driver port }|awk '{print $6}'|awk  -F, '{print $2}'    
        获取到driver pid进程号;然后 kill pid(不要使用kill -9)
    

    spark 提供的参数

       设置:spark.streaming.stopGracefullyOnShutdown=true
       实验效果:当前batch未进行完就直接退出, 有重复消费数据的问题;
       分析:  该参数在yarn cluster mode 下不起作用。 please refer:https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/
    

    Add Shutdown Hook

       add hook code:
       Runtime.getRuntime().addShutdownHook(
                 new Thread(() -> {
                     logger.warn("start to stop gracefully .....................");
                     streamContext.stop(true,true);
                     logger.warn("right now to stop gracefully .....................");
                 })); 
      实验效果: 会等待当前batch 处理完毕;并且不再接受新的input;能达到效果
      问   题: 但是面临的一个问题最后一个batch能正常处理,但是由于 consumer 已经killed; 该 kafka topic 在 当前 group下发生啦reblance;导致偏移量提交失败。
      分   析:    提交偏移量代码如下,提交是一个异步的过程
         ((CanCommitOffsets)   stream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                                System.err.println("onComplete:" + offsets.size());
                                System.err.println("is failed " + exception);
                                System.err.println("commit flag inner over .........." + offsetRanges[0].fromOffset() + "    " + offsetRanges[0].untilOffset());
    
                            });
                            System.err.println("commit flag outer over .........." + offsetRanges[0].fromOffset()  +"    "+offsetRanges[0].untilOffset());
             实验结果多次发现:回调函数内未有日志打印,  相反 "commit flag outer over" 该日志片段却有在打印。导致下次重新启动;当前batch又被重新处理
      改   进: 坚持以本次任务本次解决的宗旨,尝试在 hook code 中 再次提交失败的偏移量。
                 使用 原生 Kafka consumer API,依据缓存起来的 offsetRange[] 信息,再次提交偏移量;有以下方案:
             params:max.poll.records=1
             1:enable.auto.commit=false;consumer.commitSync();失败后异步提交:consumer.commitAsync(); 主要是kafka topic有在做 reblance的情况;
             2:enable.auto.commit=true; 自动提交偏移量
      目前使用使用的方案1;经过多次测试;可以满足需求
    

    add stop maker

       依赖于hdfs设置maker file,单独线程扫描;有该文件就使用stop(true,true)关闭服务

    相关文章

      网友评论

          本文标题:优雅关闭spark streaming job填坑之路

          本文链接:https://www.haomeiwen.com/subject/sbmmfqtx.html