美文网首页
【Spark】SparkStreaming 新增Metrics

【Spark】SparkStreaming 新增Metrics

作者: PowerMe | 来源:发表于2017-10-10 15:20 被阅读479次

    在SparkStreaming任务运行的过程中,由于数据流量或者网络的抖动,任务的batch很可能出现delay,所以就出现了一个需求:实时监控任务对kafka消息的消费,及时了解堆积情况。

    这个需求应该有很多种解决方案,我这边提供的思路是基于Spark Metrics System的。SparkStreaming任务在运行过程中,会产生很多Metrics信息,不断地推送到Sink上面,我们这里使用到的是MetricsServlet。

    打开Spark UI,我们能够很方便地通过RestAPI方式请求任务Metrics信息,接口如下:

    http://ClusterHostName:8088/proxy/AppID/metrics/json

    返回的Metrics信息如下:

    • Metrics_json.png

    这里应用的方案就是在这些Metrics里面添加一个新Metrics,这个Metrics应该能够向监控应用程序提供任务batch对records的消费情况。

    我们知道,SparkStreaming应用消费Kafka数据有两种API:Reciever模式和Direct模式。所以针对使用的不同的API,需要提供不同的Metrics信息,其格式可以如下设置:

    • Reciever-Metrics
      kafka.consumer.$zkQuorum.$topic.$groupId
    • Direcct-Metrics: kafka.direct.$kafkaBrokerList.$topic.lastCompletedBatch_sumOffsets

    注意其中带“$”号 的为变量,需要根据实际情况赋值的,其它为常量字符串。
    上面两个Metrics我们使用registerGauge方法分别向MetricsSystem注册就可以了。

    根据上面Metrics的信息可以解读到,对于Reciever-Metrics,只向监控应用提供Kafka集群的连接信息,包括ZK,topics和groupId,注意对于多个topic的情况,要注册多个Metrics,然后需要监控应用自己调kafka的API去获取该consumer的offset和logsize,从而计算出堆积量;而对于Direcct-Metrics,需要Spark计算出每个batch消费的最新offset之和(实际上是计算消费的每个topic下所有partition的最新offset之和)。

    针对具体使用来说,首先根据应用创建DStream时传递给API的参数获取到
    1)对Reciever模式:zookeeper.connectgroup.idtopics
    2)对Direcct模式:metadata.broker.list或者bootstrap.serverstopics
    等信息,并将信息配置在StreamingContext新建的结构里面(以便于StreamingSource获取)。

    这样对于Reciever-Metrics来说,使用获取的信息构造对应的Metrics并注册,就可以了,对于value设置为0;对于Direcct-Metrics来说,需要在DirectKafkaInputDStream里面每一次compute计算时,将offsetRanges里面的元数据计算后推送到StreamingJobProgressListener里面(其中配置一个topic->sumOffsets的HashMap结构即可,每次compute向里面更新最新的计算结果)。最后在StreamingSource中registerGauge时根据topic就可以获取到sumOffset。

    实现下来需要修改的Spark源码文件可能包括:
    1)StreamingJobProgressListener.scala
    2)DirectKafkaInputDStream.scala
    3)KafkaInputDStream.scala
    4)StreamingSource.scala
    5)StreamingContext.scala

    相关文章

      网友评论

          本文标题:【Spark】SparkStreaming 新增Metrics

          本文链接:https://www.haomeiwen.com/subject/hgspyxtx.html