app kafka Assign & Subscribe
stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,ConsumerStrategies.Assign[String, String](fromOffsets.asJava.keySet(),kafkaParams.asJava,fromOffsets.asJava))
第一次调用
stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
KafkaRDD#CachedKafkaConsumer
override def hasNext(): Boolean = requestOffset < part.untilOffset
override def next(): ConsumerRecord[K, V] = {
assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
val r = consumer.get(requestOffset, pollTimeout)
requestOffset += 1
r
}
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
}
总结
kafka 原生消费的Assign & Subscribe(订阅),前者可以指定offset,group id 失去作用,offset自己维护,后则通过kafka broker为consumer自动分配topic-partitions,不需要指定offset,这时候group id才会起作用,也就是组内多个消费则竞争消费,不会出现消息重复
网友评论