一、概述
参考资料:http://rocketmq.apache.org/docs/schedule-example/
即消息不是立即被消费,而是过了延迟时间后消费
二、ScheduledMessageProducer 延迟消息生产者
延迟并不是直接用具体的时间定义的,是通过message.setDelayTimeLevel(3);来实现的
RocketMQ默认的级别如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以在启动Broker的时候设置这个级别,这里选择 3,即延迟10秒
package com.asd.rocket.controller.sche;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/11 15:00
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("10.1.11.155:9876");
// Launch producer
producer.start();
int totalMessagesToSend = 1;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("qqq","TagA", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
三、Consumer消费者
就是一个普通的消费者
package com.asd.rocket.controller.chapter2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 16:17
*/
public class OrderedConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("10.1.11.155:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("qqq", "TagA || TagB || TagC");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " +
new String(msgs.get(0).getBody()) + ",从队列:"+msgs.get(0).getQueueId()+"获取,"+"%n");
this.consumeTimes.incrementAndGet();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
网友评论