美文网首页
sparkStreaming拉取kafka010的bug

sparkStreaming拉取kafka010的bug

作者: 码仙丶 | 来源:发表于2018-12-14 19:02 被阅读0次
    • 目前kafka版本中,很多公司在用kafka010,但是在用sparkstreaming去消费kafka时,很多人都会碰到一个错误
    Caused by: java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-abcd1 test1 8 1 after polling for 512
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
    
    • 解决步骤
    1. 根据报错信息
    kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
    
    • 可以看出是在这里跑出的错误,进入CachedKafkaConsumer.scala类,找到74行
    if (!buffer.hasNext()) { poll(timeout) }
    assert(buffer.hasNext(),
          s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
    var record = buffer.next()
    
    • 继续进入poll(timeout)方法
    #如果经过timeout的时间都没有拉取到数据,那么poll会返回一个空的消息集合,这里是kafka consumer的设计
    private def poll(timeout: Long): Unit = {
        val p = consumer.poll(timeout)
        val r = p.records(topicPartition)
        logDebug(s"Polled ${p.partitions()}  ${r.size}")
        buffer = r.iterator
      }
    
    • 可以看出这里是用poll去拉取kafka数据放到buffer
    #这是buffer的声明
    protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
    
    • 在回到
    #这里判断buffer是否有数据,如果没有数据就去poll,但是当poll也没有拉到数据
    if (!buffer.hasNext()) { poll(timeout) }
    #这里spark断言buffer里肯定有数据,如果没有就会抛出上面那个错误
    assert(buffer.hasNext(),
          s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
    var record = buffer.next()
    
    • 其实解决方法很简单
      KafkaRDD.scala
    #这里spark设置的默认poll的timeout是512毫秒,如果512毫秒后没拉到数据就会抛异常
    private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
    
    • 只需将这个参数在session中设置大于512就可以了,具体设置多少要根据实际情况看,例如:
    sparkConf.set("spark.streaming.kafka.consumer.poll.ms", 10000)
    

    在kafka consumer API中如果poll(timeout)没有拉到数据会返回空的消息集合
    但是spark在这里抛了异常

    相关文章

      网友评论

          本文标题:sparkStreaming拉取kafka010的bug

          本文链接:https://www.haomeiwen.com/subject/nkxnhqtx.html