如何保证消息的顺序消费,只要将一组需要顺序消费的消息发送到同一个broker的同一个队列上,并且消费者采用有序Listener即可。
下面的代码,发送十个订单,每个订单有创建,支付,发货状态;
@RequestMapping("/order_mq")
@ResponseBody
public Result oreder_mq() {
String[] tags = new String[] { "createTag", "payTag", "sendTag" };
for (int orderId = 0; orderId < 10; orderId++) {
for (int type = 0; type < 3; type++) {
Message msg = new Message("orderTopic", tags[type % tags.length], orderId + ":" + type, (orderId + ":" + type).getBytes());
sender.sendOrderMessage(orderId, msg);
}
}
return Result.success("Hello,world"); }
MQSender.java
public void sendOrderMessage(int orderId, Message msg) {
log.info("send message:" + msg);
rocketMQTemplate.asyncSendOrderly("orderTopic", msg, orderId + "", null, 3000);
}
MQReceiver.java需要实现RocketMQPushConsumerLifecycleListener,并且注册MessageListenerOrderly
@Service
@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "my-consumer_orderTopic")
public class MQReceiver1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
private static Logger log = LoggerFactory.getLogger(MQReceiver1.class);
public void prepareStart(DefaultMQPushConsumer consumer) { consumer.registerMessageListener(new MessageListenerOrderly(){
public ConsumeOrderlyStatus consumeMessage(
List msgs, ConsumeOrderlyContext context) {
try {
log.info(new String(msgs.get(0).getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
} }); }
}
网友评论