美文网首页spring 整合使用
SpringBoot消息使用RocketMQ Tag

SpringBoot消息使用RocketMQ Tag

作者: AC编程 | 来源:发表于2021-09-23 12:28 被阅读0次

一、Topic+Tag

使用SpringBoot框架集成RocketMQ,我们使用的是RocketMQTemplate这种方式实现消息的发送和接收。如果我们只用Topic不用Tag,代码是这样的:

@Slf4j
@Lazy
@Component
public class TopicTagTestSender {

    private static final String PN = "TopicTag测试生产者, ";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.topic.topicTagTestTopic}")
    private String topicTagTestTopic;

    /**
     * 发送消息
     */
    private void asyncSend(EsTopic topic,String tags){
        String message = JSONObject.toJSONString(topic);

        rocketMQTemplate.asyncSend(topicTagTestTopic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(PN + "消息发送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(PN + "消息发送失败");
                e.printStackTrace();
            }
        });
    }
}
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestHandler:"+esTopic.toString());
    }
}

如果要在Topic的基础上加上Tag,只需要topicName后面拼接:tags即可。
RocketMQTemplate.asyncSend源码如下

    /**
     * Same to {@link #asyncSend(String, Message, SendCallback)}.
     *
     * @param destination  formats: `topicName:tags`
     * @param payload      the Object to use as payload
     * @param sendCallback {@link SendCallback}
     */
    public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
        asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
    }

另外,@RocketMQMessageListener这个注解里selectorExpression默认是*,接收topic下全部消息。selectorExpression这个不支持配置,需要写成常量。

二、样例代码

2.1、Sender
package com.qimiao.qm.content.app.rocketmq.sender;

import com.alibaba.fastjson.JSONObject;
import com.qimiao.qm.common.core.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class TopicTagTestSender {

    private static final String PN = "TopicTag测试生产者, ";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.topic.topicTagTestTopic}")
    private String topicTagTestTopic;

    public static final String ADD_TAG="ADD";

    public static final String UPATE_TAG="UPDATE";

    public void asyncSendAll(EsTopic topic){
        asyncSend(topic,null);
    }

    public void asyncSendAdd(EsTopic topic){
        asyncSend(topic,ADD_TAG);
    }

    public void asyncSendUpdate(EsTopic topic){
        asyncSend(topic,UPATE_TAG);
    }

    /**
     * 发送消息
     */
    private void asyncSend(EsTopic topic,String tags){
        //Message<EsTopic> message = MessageBuilder.withPayload(topic).build();
        String message = JSONObject.toJSONString(topic);

        String destination = topicTagTestTopic;
        if(StringUtil.isNotEmpty(tags)){
            destination = topicTagTestTopic+":"+tags;
        }

        rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(PN + "消息发送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(PN + "消息发送失败");
                e.printStackTrace();
            }
        });
    }
}

2.2、TopicTagTestHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestHandler:"+esTopic.toString());
    }
}
2.3、TopicTagTestOneHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic1}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "ADD",
        //selectorExpression = ${rocketmq.tags.add} 取不到值的,
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestOneHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {

        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestOneHandler:"+esTopic.toString());
    }
}
2.4、TopicTagTestTwoHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "UPDATE",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
    }
}
2.5、TopicTagTestController
@Api(tags = "Topic-Tag测试")
@RestController
@RequestMapping("topic_tags")
public class TopicTagTestController {

    @Resource
    private TopicTagTestSender topicTagTestSender;

    @ApiOperation(value = "测试(TopicTagTestHandler收到消息)")
    @GetMapping
    public Result<Boolean> test() {
        EsTopic topic = new EsTopic("0","测试");
        topicTagTestSender.asyncSendAll(topic);
        return Result.success();
    }

    @ApiOperation(value = "测试1(TopicTagTestHandler、TopicTagTestOneHandler收到消息)")
    @GetMapping("tag_one")
    public Result<Boolean> test1() {
        EsTopic topic = new EsTopic("1","测试1");
        topicTagTestSender.asyncSendAdd(topic);
        return Result.success();
    }

    @ApiOperation(value = "测试2(TopicTagTestHandler、TopicTagTestTwoHandler收到消息)")
    @GetMapping("tag_two")
    public Result<Boolean> test2() {
        EsTopic topic = new EsTopic("2","测试2");
        topicTagTestSender.asyncSendUpdate(topic);
        return Result.success();
    }
}

三、消费端订阅多个TAG

如果一个消息有多个TAG,可以用||分隔。

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "ADD||UPDATE",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
    }
}

相关文章

网友评论

    本文标题:SpringBoot消息使用RocketMQ Tag

    本文链接:https://www.haomeiwen.com/subject/mcingltx.html