一、Topic+Tag
使用SpringBoot框架集成RocketMQ,我们使用的是RocketMQTemplate这种方式实现消息的发送和接收。如果我们只用Topic不用Tag,代码是这样的:
@Slf4j
@Lazy
@Component
public class TopicTagTestSender {
private static final String PN = "TopicTag测试生产者, ";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic.topicTagTestTopic}")
private String topicTagTestTopic;
/**
* 发送消息
*/
private void asyncSend(EsTopic topic,String tags){
String message = JSONObject.toJSONString(topic);
rocketMQTemplate.asyncSend(topicTagTestTopic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(PN + "消息发送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(PN + "消息发送失败");
e.printStackTrace();
}
});
}
}
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
topic = "${rocketmq.topic.topicTagTestTopic}",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestHandler:"+esTopic.toString());
}
}
如果要在Topic的基础上加上Tag,只需要topicName
后面拼接:tags
即可。
RocketMQTemplate.asyncSend
源码如下
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
}
另外,@RocketMQMessageListener
这个注解里selectorExpression
默认是*
,接收topic下全部消息。selectorExpression
这个不支持配置,需要写成常量。
二、样例代码
2.1、Sender
package com.qimiao.qm.content.app.rocketmq.sender;
import com.alibaba.fastjson.JSONObject;
import com.qimiao.qm.common.core.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Lazy
@Component
public class TopicTagTestSender {
private static final String PN = "TopicTag测试生产者, ";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic.topicTagTestTopic}")
private String topicTagTestTopic;
public static final String ADD_TAG="ADD";
public static final String UPATE_TAG="UPDATE";
public void asyncSendAll(EsTopic topic){
asyncSend(topic,null);
}
public void asyncSendAdd(EsTopic topic){
asyncSend(topic,ADD_TAG);
}
public void asyncSendUpdate(EsTopic topic){
asyncSend(topic,UPATE_TAG);
}
/**
* 发送消息
*/
private void asyncSend(EsTopic topic,String tags){
//Message<EsTopic> message = MessageBuilder.withPayload(topic).build();
String message = JSONObject.toJSONString(topic);
String destination = topicTagTestTopic;
if(StringUtil.isNotEmpty(tags)){
destination = topicTagTestTopic+":"+tags;
}
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(PN + "消息发送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(PN + "消息发送失败");
e.printStackTrace();
}
});
}
}
2.2、TopicTagTestHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
topic = "${rocketmq.topic.topicTagTestTopic}",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestHandler:"+esTopic.toString());
}
}
2.3、TopicTagTestOneHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic1}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "ADD",
//selectorExpression = ${rocketmq.tags.add} 取不到值的,
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestOneHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestOneHandler:"+esTopic.toString());
}
}
2.4、TopicTagTestTwoHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "UPDATE",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
}
}
2.5、TopicTagTestController
@Api(tags = "Topic-Tag测试")
@RestController
@RequestMapping("topic_tags")
public class TopicTagTestController {
@Resource
private TopicTagTestSender topicTagTestSender;
@ApiOperation(value = "测试(TopicTagTestHandler收到消息)")
@GetMapping
public Result<Boolean> test() {
EsTopic topic = new EsTopic("0","测试");
topicTagTestSender.asyncSendAll(topic);
return Result.success();
}
@ApiOperation(value = "测试1(TopicTagTestHandler、TopicTagTestOneHandler收到消息)")
@GetMapping("tag_one")
public Result<Boolean> test1() {
EsTopic topic = new EsTopic("1","测试1");
topicTagTestSender.asyncSendAdd(topic);
return Result.success();
}
@ApiOperation(value = "测试2(TopicTagTestHandler、TopicTagTestTwoHandler收到消息)")
@GetMapping("tag_two")
public Result<Boolean> test2() {
EsTopic topic = new EsTopic("2","测试2");
topicTagTestSender.asyncSendUpdate(topic);
return Result.success();
}
}
三、消费端订阅多个TAG
如果一个消息有多个TAG,可以用||分隔。
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "ADD||UPDATE",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
}
}
网友评论