美文网首页SpringBoot
SpirngBoot使用RocketMQ

SpirngBoot使用RocketMQ

作者: HachiLin | 来源:发表于2020-12-17 16:57 被阅读0次

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,日志记录,流量削锋、分布式事务等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    1. Maven添加rocketmq依赖

    <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>${rocketmq.version}</version>
    </dependency>
    

    2. 设置配置项信息

    rocketmq:
        enable: true
        namesrvAddr: host:port
        instanceName: XX  # 消息名
        defaultProducer:
            enable: true
            producerGroup: XXProducer
        defaultConsumer:
            enable: true
            producerGroup: XXConsumer
            topic: topic1,topic2,...
            batchSize: 10
            minThread: 4
            maxThread: 16
    

    3. 消息生产者

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import java.nio.charset.Charset;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Component
    public class XXProducer {
        private static final Logger logger = LoggerFactory.getLogger(XXProducer.class);
        
        private String bodyCharset = "UTF-8";
        
        @Autowired
        private String assetsModifyTopic;
        
        public boolean sendMessage(String body, topicName) {
            Message message = new Message();
            try {
                message.setTopic(topicName);
                message.setBody(body.getBytes(Charset.forName(bodyCharset)));
                SendResult sendResult = defaultMqProducer.send(message);
                if (sendResult == null) {
                    logger.warn("消息发送失败. topicName: {}, body: {}, err: {}", topicName, body);
                    return false;
                } else {
                    return ture;
                }
            } catch (Exception e) {
                logger.warn("消息发送异常. topicName: {}, body: {}, err: {}", topicName, body);
                return false;
            }
        }
    }
    

    4. 消息消费者

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Repository("messageListenerConcurrently")
    public class XXConsumer implements MessageListenerConcurrently {
        private static final Logger logger = LoggerFactory.getLogger(XXConsumer.class);
        
        @Oveerider
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt msg : msgs) {
                try {
                    // TODO 业务逻辑
                } catch (Exception e) {
                    logger.warn(~);
                    return ConsumeConcurrentlyStatus.CONSUME_LATER; 
                }    
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    

    5. 参考

    相关文章

      网友评论

        本文标题:SpirngBoot使用RocketMQ

        本文链接:https://www.haomeiwen.com/subject/fnvzgktx.html