消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,日志记录,流量削锋、分布式事务等问题,实现高性能,高可用,可伸缩和最终一致性架构。
1. Maven添加rocketmq依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
2. 设置配置项信息
rocketmq:
enable: true
namesrvAddr: host:port
instanceName: XX # 消息名
defaultProducer:
enable: true
producerGroup: XXProducer
defaultConsumer:
enable: true
producerGroup: XXConsumer
topic: topic1,topic2,...
batchSize: 10
minThread: 4
maxThread: 16
3. 消息生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class XXProducer {
private static final Logger logger = LoggerFactory.getLogger(XXProducer.class);
private String bodyCharset = "UTF-8";
@Autowired
private String assetsModifyTopic;
public boolean sendMessage(String body, topicName) {
Message message = new Message();
try {
message.setTopic(topicName);
message.setBody(body.getBytes(Charset.forName(bodyCharset)));
SendResult sendResult = defaultMqProducer.send(message);
if (sendResult == null) {
logger.warn("消息发送失败. topicName: {}, body: {}, err: {}", topicName, body);
return false;
} else {
return ture;
}
} catch (Exception e) {
logger.warn("消息发送异常. topicName: {}, body: {}, err: {}", topicName, body);
return false;
}
}
}
4. 消息消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Repository("messageListenerConcurrently")
public class XXConsumer implements MessageListenerConcurrently {
private static final Logger logger = LoggerFactory.getLogger(XXConsumer.class);
@Oveerider
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
try {
// TODO 业务逻辑
} catch (Exception e) {
logger.warn(~);
return ConsumeConcurrentlyStatus.CONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
网友评论