延迟消息
延迟消息主要针对生产者
延时队列的延时延时时间分为18个级别
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
public static void main(String[] args) throws MQClientException, InterruptedException {
//定义生产者, 生产者和消费者的group没有任何联系,生产者group是生产者集群,
DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
//生产者和消费者都是与nameserver连接,因为nameserver中维护了所有broker信息,包括topic和队列
producer.setNamesrvAddr ("192.168.44.145:9876");
//设置发送消息超时,默认为3秒,单位毫秒
producer.setSendMsgTimeout (1000);
//异步发送消息,重试次数,默认为2次
// producer.setRetryTimesWhenSendAsyncFailed (3);
//同步发送
producer.setRetryTimesWhenSendFailed (5);
//开启生产者
producer.start ();
for (int i = 0; i < 2; i++) {
try {
/**
* 第一个参数为topic
* 第二个参数tag,用与消费者过滤消息用,这种过滤发生在broker,而不是在消费者过滤,这样节省了带宽
* 第三个参数消息体
* message有构造函数中还有key,这个对消费者没有实际意义,是在查找message时可以快速查找
*/
Message msg = new Message ("TopicTest", "TagA" , ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//延时消息,开源版本只能定义下面的时间,不能自定义
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//3级别为10s,就是10s后消息放到队列中
msg.setDelayTimeLevel(3);
//这种方式为同步发送,消息发送到broker并收到了broker返回的消息,线程到这是阻塞的,保证了消息的可靠性
//消息发送,发送失败重试间隔
SendResult sendResult = producer.send (msg,1000);
System.out.printf ("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace ();
Thread.sleep (1000);
}
}
//关闭生产者
producer.shutdown ();
}
网友评论