美文网首页Spark专题
SparkStream消费kafka消息delay,但job处理

SparkStream消费kafka消息delay,但job处理

作者: 凡尔Issac | 来源:发表于2017-09-29 15:08 被阅读97次

    在场景系统中,通过SparkStream直接消费kafka数据,出现处理逻辑耗时在毫秒级,但是很多的job delay。

    示例代码如下:

    valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)

    valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)

    valtopicSet = topics.split(",").toSet

    valkafkaParams =Map("metadata.broker.list"-> brokerList)

    logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")

    valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))

    //采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用

    valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

    data.map(dataParse).print()

    代码逻辑非常简单,在dataParse方法中也只是进行json的解析,但是一个任务处理都达到4s,而SparkStream设置为了1s一个批次,从而导致越来越多的job等待,如下图:

    从上图发现存在stage持续时间为4s,故查看其详细信息,发现当前stage存在很长时间的空闲,如下图:

    故查看executor端日志,发现11:09:09完成task计算后到11:09:12期间,executor处于空闲状态,日志如下:

    此时,追踪driver端日志,试图从driver发现当前任务在进行怎样的操作,发现这段一段日志:

    17/09/29 11:09:12 DEBUG scheduler.TaskSetManager: Moving to RACK_LOCAL after waiting for 3000ms,在等待3000ms后移动到机架本地模式,继续追查当前stage启动时间,找到日志如下:

    也就是说,在9~12这个时间点中,当前task都在进行一个等待操作,而超时间为3000ms,超时后执行了Moving to RACK_LOCAL操作,并检测到本地级别的机架本地没有任务,所以移动到Any级别。

    追踪源码试图找出当前job进行了什么样的操作,定位到源码如下:

    从else的判断条件可知,当(当前时间 - 最新Task启动时间) > 本地等待时间,即会答应当前log,继续追踪发现源码内容:

    至此我们已经找到了上面task等待3s的原因,在设置sparkConf的时候,并没有设置当前三个参数,则取默认值,但是这个配置又是做什么的呢?

    查找了相关资料,并向大牛请教后得到这样的解释:

    spark在消费数据时,优先采用节点本地模式,即NODE_LOCAL(节点本地模式)>RACK_LOCAL(机架本地模式)>ANY(任意),这样在大数据量时可以做到减少网络io,每一批数据默认会等待三秒,如果三秒后数据所在节点上依旧没有启动task后,会修改为RACK_LOCAL,并且提交任务,失败后立马改为ANY模式。

    而基于当前业务,SparkStream必须每1s处理一批数据,并且只给定了一个executor,所以大部分的节点上是不存在task的,如果每批数据等待节点本地启动task,这样会导致越来越多的job delay。故只能修改相关参数的默认值,跳过wait,直接将模式设置为ANY,修改代码如下:

    valsparkConf =newSparkConf().setAppName("Scene")

    sparkConf.set("spark.locality.wait.process","0")

    sparkConf.set("spark.locality.wait.node","0")

    sparkConf.set("spark.locality.wait.rack","0")

    valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)

    valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)

    valtopicSet = topics.split(",").toSet

    valkafkaParams =Map("metadata.broker.list"-> brokerList)

    logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")

    valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))

    //采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用

    valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

    data.map(dataParse).print()

    问题圆满解决。

    相关文章

      网友评论

        本文标题:SparkStream消费kafka消息delay,但job处理

        本文链接:https://www.haomeiwen.com/subject/pnuiextx.html