美文网首页RocketMQ
六、RocketMQ Schedule example 延迟消息

六、RocketMQ Schedule example 延迟消息

作者: ASD_92f7 | 来源:发表于2019-04-11 15:16 被阅读21次

一、概述

参考资料:http://rocketmq.apache.org/docs/schedule-example/
即消息不是立即被消费,而是过了延迟时间后消费

二、ScheduledMessageProducer 延迟消息生产者

延迟并不是直接用具体的时间定义的,是通过message.setDelayTimeLevel(3);来实现的
RocketMQ默认的级别如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以在启动Broker的时候设置这个级别,这里选择 3,即延迟10秒

package com.asd.rocket.controller.sche;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/11 15:00
 */
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("10.1.11.155:9876");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 1;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("qqq","TagA", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
        // Shutdown producer after use.
        producer.shutdown();
    }
}

三、Consumer消费者

就是一个普通的消费者

package com.asd.rocket.controller.chapter2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 16:17
 */
public class OrderedConsumer {
    public static void main(String[] args) throws  Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("10.1.11.155:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("qqq", "TagA || TagB || TagC");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " +
                        new String(msgs.get(0).getBody()) + ",从队列:"+msgs.get(0).getQueueId()+"获取,"+"%n");
                this.consumeTimes.incrementAndGet();
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

相关文章

网友评论

    本文标题:六、RocketMQ Schedule example 延迟消息

    本文链接:https://www.haomeiwen.com/subject/qvitwqtx.html