1.引入阿里RocketMq依赖
<!-- https://mvnrepository.com/artifact/com.aliyun.openservices/ons-client -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>${ons-client.version}</version>
</dependency>
2.创建监听tag
@Getter
public enum PushWayEnum {
WEIXIN(0, "weixin"),
SMS(1, "sms");
int code;
String tag;
PushWayEnum(int code, String tag) {
this.code = code;
this.tag = tag;
}
public static String allTag() {
StringBuilder builder = new StringBuilder();
PushWayEnum[] values = PushWayEnum.values();
for (int i = 0; i < values.length; i++) {
PushWayEnum pushWayEnum = values[i];
if (i != 0) {
builder.append("||");
}
builder.append(pushWayEnum.tag);
}
return builder.toString();
}
}
3.创建配置类
@Configuration
@Component
@Slf4j
public class RocketMqConfig {
private final RocketMQConsumer consumer;
@Autowired
public RocketMqConfig(RocketMQConsumer consumer) {
this.consumer = consumer;
}
private Properties properties() {
Properties properties = new Properties();
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "你的AccessKey");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "你的SecretKey");
// 设置 TCP 接入域名(此处以公共云生产环境为例)
properties.put(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
return properties;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean producer) {
ProducerBean producerBean = new ProducerBean();
Properties properties = properties(systemPropertyHelper);
properties.put(PropertyKeyConst.ProducerId,"你的生产者id");
producerBean.setProperties(properties);
return producerBean;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean smsConsumerBean(SystemPropertyHelper propertyHelper) {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = properties(propertyHelper);
properties.put(PropertyKeyConst.ConsumerId, "你的消费者id");
consumerBean.setProperties(properties);
//绑定监听的topic
Subscription subscription = new Subscription();
subscription.setTopic("你的topic");
//绑定要监听的tag,多个tag用 || 隔开
subscription.setExpression(PushWayEnum.allTag());
Map<Subscription, MessageListener> map = new HashMap<>(2);
map.put(subscription, consumer);
consumerBean.setSubscriptionTable(map);
return consumerBean;
}
}
4.创建消息生产者
@Component
@Slf4j
public class RocketMQProducer {
private final ProducerBean producer;
@Autowired
public RocketMQProducer(ProducerBean producer) {
this.producer = producer;
}
/**
* 发送消息
*
* @param tag 你要发送消息的tag
* @param key
* @param body
*/
public void sendMessage(String tag, String key, byte[] body) {
Message msg = new Message("你要发送到哪个topic", tag, body);
try {
if (StringUtils.isNotBlank(key)) {
msg.setKey(key);
}
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
log.info("消息发送成功:{}", sendResult.toString());
}
} catch (ONSClientException e) {
log.info("消息发送失败:{}", e);
}
}
}
5.创建消息消费者
@Component
@Slf4j
public class RocketMQConsumer implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
String response = new String(message.getBody());
String tag = message.getTag();
try {
if (StringUtils.equals(tag, PushWayEnum.SMS.getTag())) {
log.info("收到短信消息,消息内容:{}",response);
} else if (StringUtils.equals(tag, PushWayEnum.WEIXIN.getTag())) {
log.info("收到微信消息,消息内容:{}",response);
}
//确认消息已经消费
return Action.CommitMessage;
} catch (Exception e) {
//稍后重新消费
return Action.ReconsumeLater;
}
}
}
6.附录
-
阿里RocketMq官方文档 https://help.aliyun.com/document_detail/52591.html?spm=a2c4g.11186623.6.595.26ZBBH。
-
Apache RocketMq SpringBoot集成文档 https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spring-boot-starter/README_zh_CN.md。
网友评论