美文网首页
RocketMQ 的OrderMessage demo

RocketMQ 的OrderMessage demo

作者: totohui | 来源:发表于2018-04-27 15:15 被阅读0次

    如何保证消息的顺序消费,只要将一组需要顺序消费的消息发送到同一个broker的同一个队列上,并且消费者采用有序Listener即可。

    下面的代码,发送十个订单,每个订单有创建,支付,发货状态;

    @RequestMapping("/order_mq")

    @ResponseBody

    public Result oreder_mq() {

    String[] tags = new String[] { "createTag", "payTag", "sendTag" };

    for (int orderId = 0; orderId < 10; orderId++) {

    for (int type = 0; type < 3; type++) {

    Message msg = new Message("orderTopic", tags[type % tags.length], orderId + ":" + type, (orderId + ":" + type).getBytes());

    sender.sendOrderMessage(orderId, msg);

    }

    }

    return Result.success("Hello,world"); }

    MQSender.java

    public void sendOrderMessage(int orderId, Message msg) {

    log.info("send message:" + msg);

    rocketMQTemplate.asyncSendOrderly("orderTopic", msg, orderId + "", null, 3000);

    }

    MQReceiver.java需要实现RocketMQPushConsumerLifecycleListener,并且注册MessageListenerOrderly

    @Service

    @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "my-consumer_orderTopic")

    public class MQReceiver1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener {

    private static Logger log = LoggerFactory.getLogger(MQReceiver1.class);

    public void prepareStart(DefaultMQPushConsumer consumer) { consumer.registerMessageListener(new MessageListenerOrderly(){

    public ConsumeOrderlyStatus consumeMessage(

    List msgs, ConsumeOrderlyContext context) {

    try {

    log.info(new String(msgs.get(0).getBody(), "UTF-8"));

    } catch (UnsupportedEncodingException e) {

    e.printStackTrace();

    }

    return ConsumeOrderlyStatus.SUCCESS;

    } }); } 

    }

    相关文章

      网友评论

          本文标题:RocketMQ 的OrderMessage demo

          本文链接:https://www.haomeiwen.com/subject/ccnblftx.html