美文网首页
RocketMQ 基本用法

RocketMQ 基本用法

作者: Tinyspot | 来源:发表于2024-01-17 11:53 被阅读0次

    1. RocketMQ 使用

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>5.1.4</version>
    </dependency>
    

    1.1 Spring Boot 整合 RocketMQ 包

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    

    2. 生产者 (Producer) 示例

    @Test
    public void sendDemo() throws MQClientException {
        MessageProducer messageProducer = new MessageProducer();
        messageProducer.init();
    
        String keys = UUID.randomUUID().toString().replace("-", "");
        MessageExt messageExt = new MessageExt();
        // 消息头
        messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
        // 消息体
        OrderDTO orderDTO = new OrderDTO("S001", "10001");
        messageExt.setBody(JSON.toJSONBytes(orderDTO));
        // 发送同步消息,并获取结果
        ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
        System.out.println(sendResult);
    }
    

    2.1 发送同步消息

    @RestController
    @RequestMapping("/message")
    public class MessageController {
    
        @Resource
        private MessageProducer messageProducer;
    
        @RequestMapping("/send")
        public String send() {
            MessageExt messageExt = new MessageExt();
            // 消息头
            messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
            // 消息体
            OrderDTO orderDTO = new OrderDTO("S001", "10001");
            messageExt.setBody(JSON.toJSONBytes(orderDTO));
            String keys = UUID.randomUUID().toString().replace("-", "");
            // 发送同步消息,并获取结果
            ResultDTO<SendResult> sendResult = messageProducer.send("my_topic", "my_tag", messageExt, keys, true);
            return JSON.toJSONString(sendResult);
        }
    }
    
    @Component
    public class MessageProducer {
        private final static Logger logger = LoggerFactory.getLogger(MessageProducer.class);
    
        private DefaultMQProducer producer;
    
        @PostConstruct
        public void init() throws MQClientException {
            producer = new DefaultMQProducer("producer_group");
            producer.start();
        }
    
        @PreDestroy
        public void shutdown() {
            if (producer != null) {
                producer.shutdown();
                logger.info("rocket_producer_destroy_success");
            }
        }
    
        public ResultDTO<SendResult> send(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
            if (printLog) {
                logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
                        topic, tags, messageExt, new String(messageExt.getBody()));
            }
            try {
                messageExt.setTopic(topic);
                messageExt.setTags(tags);
                messageExt.setKeys(keys);
    
                SendResult sendResult = producer.send(messageExt);
                if (sendResult != null && SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                    logger.info("send_message_success, messageId={}", sendResult.getMsgId());
                }
                return ResultDTO.success(sendResult);
            } catch (Exception e) {
                logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
                        topic, tags, messageExt, new String(messageExt.getBody()), e);
                return ResultDTO.fail("send_message_error", e.getMessage());
            }
        }
    }
    

    2.2 发送异步消息

    @RequestMapping("/sendAsync")
    public String sendAsync() {
        MessageExt messageExt = new MessageExt();
        messageExt.putUserProperty("SCENE_TYPE", "CREATE_ORDER");
        OrderDTO orderDTO = new OrderDTO("S001", "10001");
        messageExt.setBody(JSON.toJSONBytes(orderDTO));
        String keys = UUID.randomUUID().toString().replace("-", "");
    
        messageProducer.sendAsync("my_topic", "my_tag", messageExt, keys, true);
    
        return "send async message success";
    }
    
    /**
     * 发送异步消息
     */
    public void sendAsync(String topic, String tags, MessageExt messageExt, String keys, boolean printLog) {
        if (printLog) {
            logger.info("send_message_params, topic={}, tags={}, messageExt={}, body={}",
                    topic, tags, messageExt, new String(messageExt.getBody()));
        }
        try {
            messageExt.setTopic(topic);
            messageExt.setTags(tags);
            messageExt.setKeys(keys);
    
            // 设置回调函数
            producer.send(messageExt, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    logger.info("send_message_onSuccess, messageId={}", sendResult.getMsgId());
                }
    
                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                    logger.error("send_message_onException, exception={}", throwable.getMessage());
                }
            });
        } catch (Exception e) {
            logger.error("send_message_exception, topic={}, tags={}, messageExt={}, body={}, exception={}",
                    topic, tags, messageExt, new String(messageExt.getBody()), e);
        }
    }
    

    2.3 失败重试

    public boolean send(String topic, String tags, byte[] msg, String keys, Map<String, String> userProperties) {
        if (StringUtils.isBlank(topic) || msg == null) {
            return false;
        }
        Message message = new Message(topic, tags, msg);
        if (StringUtils.isNotBlank(keys)) {
            message.setKeys(keys);
        }
        if (MapUtils.isNotEmpty(userProperties)) {
            userProperties.forEach(message::putUserProperty);
        }
        // 失败后自动重试两次
        boolean sendResult = send(message);
        if (!sendResult) {
            sendResult = send(message);
        }
        if (!sendResult) {
            sendResult = send(message);
        }
        if (!sendResult) {
            logger.error("send_message_exception, topic={}, tags={}, message={}", topic, tags, new String(msg));
        }
        return true;
    }
    
    protected boolean send(Message message) {
        try {
            SendResult result = producer.send(message);
            if (result != null && SendStatus.SEND_OK.equals(result.getSendStatus())) {
                return true;
            }
        } catch (Exception e) {
            logger.error("send_message_exception, message={}, exception={}", message, e);
        }
        return false;
    }
    

    3. 消费者 (Consumer) 示例

    @Component
    public class MessageConsumer {
        private final static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    
        private DefaultMQPushConsumer consumer;
    
        @PostConstruct
        public void init() throws MQClientException {
            consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
    
            // 订阅Topic下所有Tag的消息
            // consumer.subscribe("my_topic", "*");
            consumer.subscribe("my_topic", "tag_test");
    
            // CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
            // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            // 批量消费的消息数量上限
            consumer.setConsumeMessageBatchMaxSize(1);
    
            // 消息事件监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt message : messages) {
                            logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
                                    message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        // 消息重试
                        logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            logger.info("consumer_message_success");
        }
    }
    

    3.2 消息过滤

    public void consumerMessage() throws MQClientException {
        consumer = new DefaultMQPushConsumer("consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeMessageBatchMaxSize(1);
    
        /**
         * subscribe(String topic, MessageSelector messageSelector)
         *
         * 按内容过滤:MessageSelector.bySql("status = 1001 or status = 1002");
         * 按属性过滤:MessageSelector.bySql("userProperties['sceneType'] = '1'");
         */
        // 消息过滤
        consumer.subscribe("my_topic", "tag1 || tag2");
        // consumer.subscribe("my_topic", MessageSelector.byTag("tag1 || tag2"));
    
        // 消息事件监听
        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            try {
                for (MessageExt message : messages) {
                    // do something...
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
    
        consumer.start();
        logger.info("consumer_message_success");
    }
    

    3.3 RocketMQ:Message filtering

    文档 https://www.alibabacloud.com/help/en/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering?spm=a2c63.p38356.0.0.2e626814CogIhJ

    SQL92 syntax Description Example
    AND OR
    = <>
    IS NULL
    IS NOT NULL Attribute a exists
    > >= < <=
    IN (xxx, xxx)
    BETWEEN xxx AND xxx
    NOT BETWEEN xxx AND xxx

    相关文章

      网友评论

          本文标题:RocketMQ 基本用法

          本文链接:https://www.haomeiwen.com/subject/oovlodtx.html