美文网首页
springboot 2.x 整合rocketmq-spring

springboot 2.x 整合rocketmq-spring

作者: Zal哥哥 | 来源:发表于2020-12-29 17:11 被阅读0次

    一、导入maven依赖,rocketmq-spring-boot-starter可在maven中央仓库查找

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
    </dependency>
    rocketmq 服务搭建在此省略.....

    二、yml 配置

    rocketmq:
    name-server: http://127.0.0.1:9876 #rocketmq服务地址
    producer:
    group: rocketmq_test #自定义的组名称
    send-message-timeout: 3000 #消息发送超时时长
    若rocketmq是在阿里云平台买的,使用以下配置即可

    rocketmq:
    name-server: http://xxxxx.mq-internet-access.mq-internet.aliyuncs.com:80 #阿里云rocketmq连接地址
    producer:
    group: rocketmq_test #自定义的组名称
    access-key: your aliyun accessKey
    secret-key: your aliyun secretKey
    send-message-timeout: 3000 #消息发送超时时长
    三、发送消息

    @Service
    public class RocketMQProducer{

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;
    
    /**
     * 发送普通消息
     */
    public void sendMsg(String msgBody){
        rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build());
    }
    
    /**
     * 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑
     */
    public void sendAsyncMsg(String msgBody){
        rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理消息发送成功逻辑
            }
            @Override
            public void onException(Throwable e) {
                // 处理消息发送异常逻辑
            }
        });
    }
    
    /**
     * 发送延时消息<br/>
     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>
     */
    public void sendDelayMsg(String msgBody, Integer delayLevel){
        rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);
    }
    
    /**
     * 发送带tag的消息,直接在topic后面加上":tag"
     */
    public void sendTagMsg(String msgBody){
        rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());
    }
    

    }
    四、接收消息

    /**

    • rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为*
      /
      @Slf4j
      @Component
      @RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="
      ",consumerGroup = "queue_group_test")
      public class RocketMQMsgListener implements RocketMQListener<MessageExt> {

      @Autowired
      private RocketMQTemplate rocketMQTemplate;

      @Override
      public void onMessage(MessageExt message) {
      byte[] body = message.getBody();
      String msg = new String(body);
      log.info("接收到消息:{}", msg);
      }

    }

    https://blog.csdn.net/qq_38306688/article/details/107716046

    相关文章

      网友评论

          本文标题:springboot 2.x 整合rocketmq-spring

          本文链接:https://www.haomeiwen.com/subject/yqcqoktx.html