package com.djh.consumertest.config.consumer.processor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
// MessageListenerConcurrently 并发的处理监听
// MessageListenerOrderly 顺序的处理监听
public class MQConsumeListenerOrderly implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
// 设置自动提交
context.setAutoCommit(true);
for(MessageExt msg : msgs) { // 批量获取消息
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), "utf-8");
// 模拟业务逻辑处理...
log.info("Consumer-获取消息-msgId为{}, 主题topic为={}, 标签tags为={},QueueId为={},消费消息为={}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), body);
// if(keys.equals("testkey1")){
// System.out.println("消息消费失败");
// int i = 1/0;
// }
//Order order = JSON.parseObject(body, Order.class);
//String code = order.getOrderId();
} catch (Exception e) {
e.printStackTrace();
// int reconsumeTimes = msg.getReconsumeTimes();
// System.out.println("失败次数reconsumeTimes:"+ reconsumeTimes);
// if(reconsumeTimes == 3){
// // 写入日志
// // 做补偿处理
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
// System.out.println("消费失败");
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
网友评论