美文网首页Spark专题
SparkStream消费kafka消息delay,但job处理

SparkStream消费kafka消息delay,但job处理

作者: 凡尔Issac | 来源:发表于2017-09-29 15:08 被阅读97次

在场景系统中,通过SparkStream直接消费kafka数据,出现处理逻辑耗时在毫秒级,但是很多的job delay。

示例代码如下:

valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)

valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)

valtopicSet = topics.split(",").toSet

valkafkaParams =Map("metadata.broker.list"-> brokerList)

logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")

valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))

//采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用

valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

data.map(dataParse).print()

代码逻辑非常简单,在dataParse方法中也只是进行json的解析,但是一个任务处理都达到4s,而SparkStream设置为了1s一个批次,从而导致越来越多的job等待,如下图:

从上图发现存在stage持续时间为4s,故查看其详细信息,发现当前stage存在很长时间的空闲,如下图:

故查看executor端日志,发现11:09:09完成task计算后到11:09:12期间,executor处于空闲状态,日志如下:

此时,追踪driver端日志,试图从driver发现当前任务在进行怎样的操作,发现这段一段日志:

17/09/29 11:09:12 DEBUG scheduler.TaskSetManager: Moving to RACK_LOCAL after waiting for 3000ms,在等待3000ms后移动到机架本地模式,继续追查当前stage启动时间,找到日志如下:

也就是说,在9~12这个时间点中,当前task都在进行一个等待操作,而超时间为3000ms,超时后执行了Moving to RACK_LOCAL操作,并检测到本地级别的机架本地没有任务,所以移动到Any级别。

追踪源码试图找出当前job进行了什么样的操作,定位到源码如下:

从else的判断条件可知,当(当前时间 - 最新Task启动时间) > 本地等待时间,即会答应当前log,继续追踪发现源码内容:

至此我们已经找到了上面task等待3s的原因,在设置sparkConf的时候,并没有设置当前三个参数,则取默认值,但是这个配置又是做什么的呢?

查找了相关资料,并向大牛请教后得到这样的解释:

spark在消费数据时,优先采用节点本地模式,即NODE_LOCAL(节点本地模式)>RACK_LOCAL(机架本地模式)>ANY(任意),这样在大数据量时可以做到减少网络io,每一批数据默认会等待三秒,如果三秒后数据所在节点上依旧没有启动task后,会修改为RACK_LOCAL,并且提交任务,失败后立马改为ANY模式。

而基于当前业务,SparkStream必须每1s处理一批数据,并且只给定了一个executor,所以大部分的节点上是不存在task的,如果每批数据等待节点本地启动task,这样会导致越来越多的job delay。故只能修改相关参数的默认值,跳过wait,直接将模式设置为ANY,修改代码如下:

valsparkConf =newSparkConf().setAppName("Scene")

sparkConf.set("spark.locality.wait.process","0")

sparkConf.set("spark.locality.wait.node","0")

sparkConf.set("spark.locality.wait.rack","0")

valbrokerList = ConfigLoader.get(KafkaConfig.runDataBrokerListKey)

valtopics = ConfigLoader.get(KafkaConfig.runDataTopic)

valtopicSet = topics.split(",").toSet

valkafkaParams =Map("metadata.broker.list"-> brokerList)

logger.info(s"设备运行数据kafka brokerList:$brokerList, topics:$topics")

valssc =newStreamingContext(sparkConf,Seconds(ConfigLoader.get(CommonConfig.batchDuration).toInt))

//采用直接消费的方式,每次只会消费最新的数据,对于当前实时业务适用

valdata = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

data.map(dataParse).print()

问题圆满解决。

相关文章

  • SparkStream消费kafka消息delay,但job处理

    在场景系统中,通过SparkStream直接消费kafka数据,出现处理逻辑耗时在毫秒级,但是很多的job del...

  • Kafka 消费者心跳线程源码解析

    kafka消费者在消费消息时,分为心跳线程和用户线程(处理消息的线程) 消费消息poll方法 我们在第一次启动消费...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • Pulsar的Key_Shared消费模式与Kafka的消费者再

    Kafka消费者再均衡 在Kafka中,一个分区只能有一个消费者,处于一个消费组的消费者,处理消息的时候是...

  • Apache Kafka学习笔记

    Apache Kafka Kafka是专为分布式高吞吐系统设计的生产-消费消息中间件,可以处理大量的数据,适合离线...

  • Spring boot 和 kafka的优雅集成

    在日常的开发工作中,kafka作为消息处理中间件,会经常在SpringBoot的应用中连接kafka,消费kakf...

  • Kafka 学习笔记

    一、Kafka简介 Kafka (科技术语)。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • Kafka_核心

    kafka集群 Kafka的设计都是为了实现kafak消息队列消费数据的语义Kafka消息队列中数据消费的三种语义...

  • 清华架构师熬夜整理,带你走进Kafka

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所在动作流数据。 Kafka基础 消息...

网友评论

    本文标题:SparkStream消费kafka消息delay,但job处理

    本文链接:https://www.haomeiwen.com/subject/pnuiextx.html