RocketMQ(十)高级特性-消息过滤

作者: 我犟不过你 | 来源:发表于2020-11-30 16:23 被阅读0次

RocketMq提供消息过滤的功能,用于同一topic下,区分不同业务场景的消息。

Tag,即消息标签,用于对某个Topic下的消息进行分类。消息队列RocketMQ版的生产者在发送消息时,已经指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

使用springboot-starter的方式发送tag消息,只需要如下形式即可,不需要单独指定参数:

topic:tag

示例代码:
控制器,方便测试,将tag作为接口参数:

    /**
     * 消息过滤,发送tag消息
     */
    @RequestMapping("/send/tag/{tag}")
    public void tag(@PathVariable("tag") String tag) {
        //发送格式topic:tag
        rocketMqProducer.tag("消息过滤,发送tag消息", "test_tag:"+tag);
    }

生产者:

    /**
     * tag消息过滤
     */
    public void tag(String msgBody, String topic) {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
        if (ObjectUtils.isNotEmpty(sendResult)) {
            //sendResult不空则表示消息发送成功
            log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
        }
    }

消费者,获取tag为“Tom”的消息,selectorExpression设置为Tom,全部接收设置为*:

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_tag", selectorExpression = "Tom", consumerGroup = "test_tag")
public class TagMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive sync message:{}", msg);
    }
}

下面分别发送Tom,Jerry,分别看结果:
发送Tom,接收到消息:

Tom
2020-11-30 16:22:34.387  INFO 44396 --- [nio-8085-exec-6] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 消息过滤,发送tag消息, messageId = AC100208AD6C18B4AAC298DBFE77000F
2020-11-30 16:22:34.388  INFO 44396 --- [MessageThread_2] c.c.b.m.r.consumer.TagMessageListener    : receive sync message:消息过滤,发送tag消息

发送Jerry,消费者没收到消息:

Jerry
2020-11-30 16:22:03.677  INFO 44396 --- [nio-8085-exec-5] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 消息过滤,发送tag消息, messageId = AC100208AD6C18B4AAC298DB867F000C

相关文章

网友评论

    本文标题:RocketMQ(十)高级特性-消息过滤

    本文链接:https://www.haomeiwen.com/subject/cbarwktx.html