RocketMq提供消息过滤的功能,用于同一topic下,区分不同业务场景的消息。
Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
使用springboot-starter的方式发送tag消息,只需要如下形式即可,不需要单独指定参数:
topic:tag
示例代码:
控制器,方便测试,将tag作为接口参数:
/**
* 消息过滤,发送tag消息
*/
@RequestMapping("/send/tag/{tag}")
public void tag(@PathVariable("tag") String tag) {
//发送格式topic:tag
rocketMqProducer.tag("消息过滤,发送tag消息", "test_tag:"+tag);
}
生产者:
/**
* tag消息过滤
*/
public void tag(String msgBody, String topic) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
if (ObjectUtils.isNotEmpty(sendResult)) {
//sendResult不空则表示消息发送成功
log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
}
}
消费者,获取tag为“Tom”的消息,selectorExpression设置为Tom,全部接收设置为*:
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_tag", selectorExpression = "Tom", consumerGroup = "test_tag")
public class TagMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive sync message:{}", msg);
}
}
下面分别发送Tom,Jerry,分别看结果:
发送Tom,接收到消息:
![](https://img.haomeiwen.com/i16830368/c58622ccaef4efaf.png)
2020-11-30 16:22:34.387 INFO 44396 --- [nio-8085-exec-6] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 消息过滤,发送tag消息, messageId = AC100208AD6C18B4AAC298DBFE77000F
2020-11-30 16:22:34.388 INFO 44396 --- [MessageThread_2] c.c.b.m.r.consumer.TagMessageListener : receive sync message:消息过滤,发送tag消息
发送Jerry,消费者没收到消息:
![](https://img.haomeiwen.com/i16830368/258f5cd27e611621.png)
2020-11-30 16:22:03.677 INFO 44396 --- [nio-8085-exec-5] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 消息过滤,发送tag消息, messageId = AC100208AD6C18B4AAC298DB867F000C
网友评论