美文网首页
rocketmq-consumer

rocketmq-consumer

作者: 划水者 | 来源:发表于2018-08-19 09:44 被阅读0次

rocketmq 消费消息大致有以下几种场景类型

乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消费,所以不能保证消息的有序性

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TestTopic", "*");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20170422221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");

顺序消息,发送到同一个队列的消息需要保证有序消费

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

    consumer.subscribe("TestTopic", "*");

    consumer.setNamesrvAddr("localhost:9876");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.setConsumeTimestamp("20170422221800");

    consumer.registerMessageListener(new MessageListenerOrderly() {

        @Override
        public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
                                            final ConsumeOrderlyContext context){
            for(MessageExt messageExt : msgs){
                System.out.println(new String(messageExt.getBody());
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();

    System.out.printf("Consumer Started.%n");

顺序消息用的比较多的是订单系统,订单状态之间的扭转需要保证有序,所以通常同一个订单ID发送相关的状态消息需要保证有序

集群消费和广播消费的区别
集群消费,同一个消费组,均匀的消费该topic下的消息,该消费组下所有的消费者消费的总消息等于该topic下的消息

广播消费,同一个消费组下的每个消费者都消费该topic下的所有消息

相关文章

  • rocketmq-consumer

    rocketmq 消费消息大致有以下几种场景类型 乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消...

  • rocketMq-consumer介绍

    说到rocketMq的consumer,该篇文章特指pushConsumer,pullConsumer在后续文章中...

  • 五、RocketMQ-Consumer启动流程

    一、概述 一个最简单的Consumer的启动代码如下: 最重要的有几个步骤: new DefaultMQPushC...

网友评论

      本文标题:rocketmq-consumer

      本文链接:https://www.haomeiwen.com/subject/jizqiftx.html