美文网首页RocketMQspringbootkankan(good)
springboot集成RocketMQ使用

springboot集成RocketMQ使用

作者: Geroge1226 | 来源:发表于2023-02-27 11:08 被阅读0次

    1、 说明

    Springboot集成RocketMQ时需要特别注意版本问题,否则会出现各类启动报错问题,这里使用的springboot 版本:2.2.7.RELEASE, RocketMq版本:2.2.3

    2、集成过程

    • pom文件引用
      这里网上有单独在引用rocketmq-client依赖包的,是不需要的,已经包含在rocketmq-spring-boot-starter依赖包中。
     <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.7.RELEASE</version>
        </parent>
     
     <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-spring-boot-starter</artifactId>
          <version>2.1.1</version>
     </dependency>
    
    • yml文件配置
    spring:
      # 数据库信息
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/longfamily?useUnicode=true&characterEncoding=utf-8&userSSL=false
        username: root
        password: 123456
    
    ##RocketMq配置
    rocketmq:
      #nameservice服务器地址(多个以英文逗号隔开)
     # 注意高版本下使用的是name-server,低版本使用的是 nameServer。
      name-server: 10.0.164.31:9876
      producer:
        group: test
    
    

    这里把spring配置也贴出来,是想说明rocketMq与spring在yaml文件中是同一级别(容易当成spring节点下面的子节点)。否则会启动报错。

    • 消费者
    @Component
    @RocketMQMessageListener(consumerGroup = "H", topic = RocketMQTopic.TOPIC_ORDER)
    public class TestMQListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            System.out.println("消费者收到信息:==="+message);
        }
    }
    

    消费者是通过RocketMQMessageListener来监听Topc, 通过实现RocketMQListener <T>接口实现其onMessage(T t)方法来处理接收到的消息

    • 生产者
      这里同意封装了RocketMQ发送消息的工具类。
    @Slf4j
    @Component
    public class RocketMQUtils {
    
        @Resource
        private RocketMQTemplate rocketMQTemplate;
        /**
         * 异步发送MQ消息
         */
        public void sendMessage(Long id, final String topic, final String context, Boolean isExternal) {
            try {
                log.debug("Sending message to MQ topic {}, context {}", topic, context);
                SendCallback callback = new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        //打印msgId用来以备查验,外部消息发送mq成功则任务置为成功
                        log.info("Success sending message to MQ: {}, context: {}, msgId: {}", topic, context, sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        log.error("Failed to send message to MQ {}, msg {}, cause {}", topic, context, Throwables.getStackTraceAsString(e));
                    }
                };
                rocketMQTemplate.asyncSend(topic, context, callback);
            } catch (Exception e) {
                log.error("Failed to send message to MQ! message: {}, stackTrace: {}", context, Throwables.getStackTraceAsString(e));
            }
        }
    }
    
    • 测试发送消息接口
    @RestController
    @RequestMapping("/api/test")
    public class CommonTestController {
    
        @Autowired
        private RocketMQUtils rocketMQUtils;
        // mq
        @PostMapping("/mq/send")
        public void operateRocketMQ(){
            rocketMQUtils.sendMessage(1L, RocketMQTopic.TOPIC_ORDER,"firstContext",true);
        }
    
    • 运行结果
    [2023-02-27 10:17:30.741] [DEBUG] [T:http-nio-8061-exec-3][tid=][Class:c.g.c.u.RocketMQUtils -> sendMessage]|Sending message to MQ topic topic-order, context firstContext
    [2023-02-27 10:17:30.760] [INFO] [T:NettyClientPublicExecutor_2][tid=][Class:c.g.c.u.RocketMQUtils -> onSuccess]|Success sending message to MQ: topic-order, context: firstContext, msgId: 0000010169EE18B4AAC2881AB17B0001
    消费者收到信息:===firstContext
    

    相关文章

      网友评论

        本文标题:springboot集成RocketMQ使用

        本文链接:https://www.haomeiwen.com/subject/wmdjwrtx.html