美文网首页
RocketMQ 和Spring Cloud Stream

RocketMQ 和Spring Cloud Stream

作者: just_like_you | 来源:发表于2019-08-20 17:44 被阅读0次

    Spring Cloud Stream

    RocketMQ的使用

    生产者

    • 添加配置
    rocketmq:
      name-server: 192.168.18.91:9876 #rocketmq server地址
      producer:
        group: test-group #生产者组名
    
    • 使用RocketMQTemplate发送消息,这里是使用了RocketMQ提供的MQ分布式事务。
    this.rocketMQTemplate.sendMessageInTransaction(
                        "tx-add-bonus",
                        "add-bonus",
                        MessageBuilder.withPayload(
                                UserAddBonusMsgDTO.builder()
                                        .bonus(500)
                                        .userId(share.getUserId())
                                        .build()
                        )
                                .setHeader("transactionId", transactionId)
                                .setHeader("shareId", id)
                                .build(),
                        //传递参数
                        auditDTO
                );
    
    • 监听生产者本地事务,以及使用一个transationId来辅助判断本地是否是否提交
    @Component
    @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus")
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class AddBonusMQTransactionListener implements RocketMQLocalTransactionListener {
    
        private final ShareServiceImpl shareService;
        private final RocketmqTxLogMapper rocketmqTxLogMapper;
    
        /**
         * 监听执行本地方法事务
         *
         * @param msg 消息体
         * @param arg 参数
         * @return {@link RocketMQLocalTransactionState}
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                MessageHeaders headers = msg.getHeaders();
                Integer shareId = Optional.ofNullable(headers.get("shareId"))
                        .map(String::valueOf)
                        .map(Integer::valueOf).orElseThrow(() -> new IllegalArgumentException("shareId不能为空"));
                String transactionId = (String) msg.getHeaders().get("transactionId");
                //要监听的方法
                this.shareService.updateStatusWithRocketTx(shareId, (AuditDTO) arg,transactionId);
                //执行成功则给broker发送提交事务信号
                return COMMIT;
            } catch (Exception e) {
                return ROLLBACK;
            }
        }
        /**
         * rocketMQ Server 回查
         *
         * @param msg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String transactionId = (String) msg.getHeaders().get("transactionId");
            if (rocketmqTxLogMapper.selectCount(Wrappers.<RocketmqTxLog>lambdaQuery().eq(StringUtils.isNotBlank(transactionId), RocketmqTxLog::getTransactionId, transactionId)) == 1) {
                return COMMIT;
            }
            return ROLLBACK;
        }
    }
    

    消费者

    • 添加监听器即可
    普通消费监听器
    @Service
    @RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
        private final UserMapper userMapper;
        private final BonusEventLogMapper bonusEventLogMapper;
    
        @Override
        public void onMessage(UserAddBonusMsgDTO message) {
            Optional.ofNullable(message.getUserId())
                    .map(userMapper::selectById)
                    .ifPresent(user -> {
                        user.setBonus(user.getBonus() + message.getBonus());
                        userMapper.updateById(user);
                    });
    
            Optional.of(bonusEventLogMapper.insert(
                    BonusEventLog.builder()
                            .createTime(LocalDateTime.now())
                            .description("添加积分")
                            .event("add-bonus")
                            .userId(message.getUserId())
                            .value(message.getBonus())
                            .build()
            )).filter(i -> i == 1).orElseThrow(() -> new IllegalArgumentException("添加积分失败"));
        }
    }
    

    Spring Cloud Stream的简单使用

    • 添加依赖
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            </dependency>
    
    • 添加注解和配置

    添加注解:

    @EnableBinding({CustomerChannel.class}) //这里采用自定义的收发接口
    public class TestController
    

    自定义接口:

    public interface CustomerChannel {
    
        String OUTPUT = "my-output";   //这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
        String INPUT = "my-input";
    
        @Output(CustomerChannel.OUTPUT)
        MessageChannel output();
    
    
        @Input(CustomerChannel.INPUT)
        SubscribableChannel input();
    
    }
    

    application.yml配置:

        stream:
          rocketmq:
            binder:
              name-server: 192.168.18.91:9876
            bindings:
              my-input:
                consumer:
                  tags: tag2 || tag1
          bindings:
            my-input:
              destination: my-stream-topic
              group: my-stream-group
            my-output:
              destination: my-stream-topic
    
    • 编写代码
    //使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
    @GetMapping("/test-stream-send-2")
        public String testCustomInterfaceSendMsg() {
            this.customerChannel.output()
                    .send(MessageBuilder.withPayload("send message").setHeader(RocketMQHeaders.TAGS, "tag3").build());
            return "success";
        }
        //使用@StreamListener来监听消息
        @StreamListener(value = CustomerChannel.INPUT)
        public void testCustomListener(Message message) {
            System.out.println(message.getPayload().toString());
        }
    

    Spring Cloud Stream的过滤消息

    大佬博客手记

    Spring Cloud Stream的监控断点

    • /actuator/bindings
    • /actuator/channels
    • /actuator/health

    相关文章

      网友评论

          本文标题:RocketMQ 和Spring Cloud Stream

          本文链接:https://www.haomeiwen.com/subject/dsfmsctx.html