第一个是刚开始重平衡 还没开始
第二是已经分配好了
建议你在第一个回调里提交offset
第二个回调里获取offset进行重置
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
consumer.commitAsync(); // 提交偏移量
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 获取该分区下已消费的偏移量
long commitedOffset = -1;
for (TopicPartition topicPartition : partitions) {
// 获取该分区下已消费的偏移量
commitedOffset = consumer.committed(topicPartition).offset();
// 重置偏移量到上一次提交的偏移量下一个位置处开始消费
consumer.seek(topicPartition, commitedOffset + 1);
}
}
assign不受这个影响
这个是subscribe方式进行消费
网友评论