美文网首页
spark kafka 直连模式源码

spark kafka 直连模式源码

作者: 邵红晓 | 来源:发表于2020-06-28 12:44 被阅读0次

app kafka Assign & Subscribe

stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,ConsumerStrategies.Assign[String, String](fromOffsets.asJava.keySet(),kafkaParams.asJava,fromOffsets.asJava))

第一次调用
stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

KafkaRDD#CachedKafkaConsumer

override def hasNext(): Boolean = requestOffset < part.untilOffset

override def next(): ConsumerRecord[K, V] = {
      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
      val r = consumer.get(requestOffset, pollTimeout)
      requestOffset += 1
      r
}

 def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
    if (offset != nextOffset) {
      logInfo(s"Initial fetch for $groupId $topic $partition $offset")
      seek(offset)
      poll(timeout)
    }

总结

kafka 原生消费的Assign & Subscribe(订阅),前者可以指定offset,group id 失去作用,offset自己维护,后则通过kafka broker为consumer自动分配topic-partitions,不需要指定offset,这时候group id才会起作用,也就是组内多个消费则竞争消费,不会出现消息重复

相关文章

网友评论

      本文标题:spark kafka 直连模式源码

      本文链接:https://www.haomeiwen.com/subject/ydnifktx.html