美文网首页
rocketmq-consumer

rocketmq-consumer

作者: 划水者 | 来源:发表于2018-08-19 09:44 被阅读0次

    rocketmq 消费消息大致有以下几种场景类型

    乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消费,所以不能保证消息的有序性

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TestTopic", "*");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20170422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    

    顺序消息,发送到同一个队列的消息需要保证有序消费

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    
        consumer.subscribe("TestTopic", "*");
    
        consumer.setNamesrvAddr("localhost:9876");
    
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
        consumer.setConsumeTimestamp("20170422221800");
    
        consumer.registerMessageListener(new MessageListenerOrderly() {
    
            @Override
            public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
                                                final ConsumeOrderlyContext context){
                for(MessageExt messageExt : msgs){
                    System.out.println(new String(messageExt.getBody());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    
        System.out.printf("Consumer Started.%n");
    

    顺序消息用的比较多的是订单系统,订单状态之间的扭转需要保证有序,所以通常同一个订单ID发送相关的状态消息需要保证有序

    集群消费和广播消费的区别
    集群消费,同一个消费组,均匀的消费该topic下的消息,该消费组下所有的消费者消费的总消息等于该topic下的消息

    广播消费,同一个消费组下的每个消费者都消费该topic下的所有消息

    相关文章

      网友评论

          本文标题:rocketmq-consumer

          本文链接:https://www.haomeiwen.com/subject/jizqiftx.html