1. 导入依赖
compile 'org.apache.rocketmq:rocketmq-client:4.5.2'
2. 编写application.yml配置
image.png3. 引入配置信息
为了方便,在这里消费者和生产者都放在一个项目里
引入生产者配置信息
package utry.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消息生产者配置信息
*/
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class ProducerPropertiesConfig {
@Value("${namesrvAddr}")
private String namesrvAddr;
private String groupName;
private Integer maxMessageSize;
private Integer sendMsgTimeout;
private Integer retryTimesWhenSendFailed;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public Integer getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public Integer getSendMsgTimeout() {
return sendMsgTimeout;
}
public void setSendMsgTimeout(Integer sendMsgTimeout) {
this.sendMsgTimeout = sendMsgTimeout;
}
public Integer getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(Integer retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
@Override
public String toString() {
return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
}
}
编写生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消息生产者
*/
@Configuration
public class ProducerConfigure {
Logger logger = LoggerFactory.getLogger(ProducerConfigure.class);
@Autowired
private ProducerPropertiesConfig producerPropertiesConfig;
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
logger.info(producerPropertiesConfig.toString());
logger.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(producerPropertiesConfig.getGroupName());
producer.setNamesrvAddr(producerPropertiesConfig.getNamesrvAddr());
producer.setVipChannelEnabled(false);
//其他属性自行设置,这里才用默认
producer.start();
logger.info("rocketmq producer server开启成功---------------------------------.");
return producer;
}
}
引入消费者配置信息
package utry.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消费者属性配置类
*/
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
public class ConsumerPropertiesConfig {
private String groupName;
@Value("${namesrvAddr}")
private String namesrvAddr;
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
@Override
public String toString() {
return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
}
}
编写消费者
先编写一个抽象类,再写具体的实现
- 编写抽象类
package utry.config;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author
* 抽象的消息消费者
*/
@Service
public abstract class DefaultConsumerConfigure {
Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);
@Autowired
private ConsumerPropertiesConfig consumerConfig;
/**
* 开启消费者监听服务
* @param topic
* @param tag
* @throws MQClientException
*/
public void listener(String topic, String tag) throws MQClientException {
log.info("开启" + topic + ":" + tag + "消费者-------------------");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
consumer.subscribe(topic, tag);
// 开启内部类实现监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
return DefaultConsumerConfigure.this.dealBody(messageExtList);
}
});
consumer.start();
log.info("rocketmq启动成功---------------------------------------");
}
/**
* 处理body的业务
* @param messageExtList
* @return
*/
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList);
}
- 编写消费者
package utry.config;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
/**
* @author
* 消息消费者
*/
@Configuration
public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
super.listener("t_TopicTest", "Tag1");
} catch (MQClientException e) {
log.error("消费者监听器启动失败", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList) {
log.info("接收到消息");
for (MessageExt msg : messageExtList) {
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (Exception e) {
log.error("body转字符串解析失败");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
编写Controller测试
package utry.controller;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import utry.config.CustomConsumer;
@RestController
public class ProducerController {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Autowired
private DefaultMQProducer producer;
@GetMapping("/msg/product")
public void test(String info) throws Exception {
Message message = new Message("t_TopicTest", "Tag1", "12345", info.getBytes());
// 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
// 不过要注意的是这个是异步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("传输成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("传输失败", e);
}
});
}
}
github:
网友评论