场景:
通过kafka消费作业,存储到数据库中,当遇到错误信息时停止提交offset,关闭消费者,发送报警短信
项目运行了很久,但是突然发现在关闭消费者后,有时候重启会遇到消息丢失的情况,于是从代码入手,查找原因,消费者代码如下
@Override
public void run() {
// 构建kafka监控
Thread thread = Thread.currentThread();
ThreadHolder threadHolder = new ThreadHolder(this.id, this.name, thread.getState());
THREAD_HOLDER_MAP.put(this.id, threadHolder);
log.info("consumer task start, id = " + id);
while (runnable) {
int partition = 0;
long offset = 0;
String key = null;
String topic = null;
String value = null;
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
partition = record.partition();
offset = record.offset();
key = record.key();
topic = record.topic();
value = record.value();
log.info(String.format("topic:%s, partition:%d, offset:%d, key:%s, message:%s", topic, partition, offset, key, value));
consumer.process(record);
kafkaConsumer.commitAsync();
}
} catch (MyException e) {
// 如果是自定义的错误,就扔到redis上,保留以后处理
if (value != null) {
log.warn("error message:{}", value);
hashOperations.put(RedisKey.KAFKA_ERROR_KEY, CorrectUtils.join(topic, key, partition, offset), value);
kafkaConsumer.commitAsync();
}
} catch (Exception e) {
log.warn("process message failure!", e);
this.runnable = false;
kafkaConsumer.close();
THREAD_HOLDER_MAP.replace(this.id, new ThreadHolder(this.id, this.name, Thread.State.TERMINATED));
log.info("consumer task shutdown, id = " + id);
}
}
}
下面开始对问题进行分析,根据经验,丢消息的原因一般是offset提交的不正确,所以从commitAsync()方法入手,根据源码进行分析
/**
* Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
*/
@Override
public void commitAsync() {
commitAsync(null);
}
可以发现这个方法的含义是提交当前拉取的offset,但是由于是在ConsumerRecords循环中调用的所以会导致一个问题
一次拉取了十条消息,第一次commit就将最大的offset提交,这样在消费到第二条消息如果遇到报错关闭消费者后,下次重启也会从第十一条开始消费,所以丢掉了未消费的九条消息,该问题只会出现在一次拉取多条的情况下
然后开始思考解决办法,有如下几个方案
- 将commit放置到循环内,但是自行控制offset,每次+1,这样可以保证消息的不丢失不重复,但是会造成多次请求
- 将commit放置到循环外,同时在消费失败后,关闭消费者之前进行commit,offset为当前offset+已经消费成功的消息数量
网友评论