美文网首页
SpringBoot集成RocketMQ

SpringBoot集成RocketMQ

作者: ce5154e79490 | 来源:发表于2020-07-14 18:33 被阅读0次

    1. 导入依赖

    compile 'org.apache.rocketmq:rocketmq-client:4.5.2'

    image.png

    2. 编写application.yml配置

    image.png

    3. 引入配置信息

    为了方便,在这里消费者和生产者都放在一个项目里

    引入生产者配置信息

    package utry.config;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author
     * 消息生产者配置信息
     */
    @Configuration
    @ConfigurationProperties(prefix = "rocketmq.producer")
    public class ProducerPropertiesConfig {
    
        @Value("${namesrvAddr}")
        private String namesrvAddr;
    
        private String groupName;
    
        private Integer maxMessageSize;
    
        private Integer sendMsgTimeout;
    
        private Integer retryTimesWhenSendFailed;
    
        public String getNamesrvAddr() {
            return namesrvAddr;
        }
    
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    
        public String getGroupName() {
            return groupName;
        }
    
        public void setGroupName(String groupName) {
            this.groupName = groupName;
        }
    
        public Integer getMaxMessageSize() {
            return maxMessageSize;
        }
    
        public void setMaxMessageSize(Integer maxMessageSize) {
            this.maxMessageSize = maxMessageSize;
        }
    
        public Integer getSendMsgTimeout() {
            return sendMsgTimeout;
        }
    
        public void setSendMsgTimeout(Integer sendMsgTimeout) {
            this.sendMsgTimeout = sendMsgTimeout;
        }
    
        public Integer getRetryTimesWhenSendFailed() {
            return retryTimesWhenSendFailed;
        }
    
        public void setRetryTimesWhenSendFailed(Integer retryTimesWhenSendFailed) {
            this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
        }
    
        @Override
        public String toString() {
            return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
        }
    }
    
    

    编写生产者

    
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author
     * 消息生产者
     */
    @Configuration
    public class ProducerConfigure {
    
        Logger logger = LoggerFactory.getLogger(ProducerConfigure.class);
    
        @Autowired
        private ProducerPropertiesConfig producerPropertiesConfig;
    
        @Bean
        public DefaultMQProducer defaultProducer() throws MQClientException {
            logger.info(producerPropertiesConfig.toString());
            logger.info("defaultProducer 正在创建---------------------------------------");
            DefaultMQProducer producer = new DefaultMQProducer(producerPropertiesConfig.getGroupName());
            producer.setNamesrvAddr(producerPropertiesConfig.getNamesrvAddr());
            producer.setVipChannelEnabled(false);
            //其他属性自行设置,这里才用默认
            producer.start();
            logger.info("rocketmq producer server开启成功---------------------------------.");
            return producer;
        }
    }
    

    引入消费者配置信息

    package utry.config;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author
     * 消费者属性配置类
     */
    @ConfigurationProperties(prefix = "rocketmq.consumer")
    @Configuration
    public class ConsumerPropertiesConfig {
        private String groupName;
    
        @Value("${namesrvAddr}")
        private String namesrvAddr;
    
        public String getGroupName() {
            return groupName;
        }
    
        public void setGroupName(String groupName) {
            this.groupName = groupName;
        }
    
        public String getNamesrvAddr() {
            return namesrvAddr;
        }
    
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    
        @Override
        public String toString() {
            return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
        }
    
    }
    

    编写消费者
    先编写一个抽象类,再写具体的实现

    1. 编写抽象类
    package utry.config;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.List;
    
    /**
     * @author
     * 抽象的消息消费者
     */
    @Service
    public abstract class DefaultConsumerConfigure {
        Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);
    
        @Autowired
        private ConsumerPropertiesConfig consumerConfig;
    
        /**
         * 开启消费者监听服务
         * @param topic
         * @param tag
         * @throws MQClientException
         */
        public void listener(String topic, String tag) throws MQClientException {
            log.info("开启" + topic + ":" + tag + "消费者-------------------");
            log.info(consumerConfig.toString());
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
    
            consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
    
            consumer.subscribe(topic, tag);
    
            // 开启内部类实现监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                    return DefaultConsumerConfigure.this.dealBody(messageExtList);
                }
            });
            consumer.start();
    
            log.info("rocketmq启动成功---------------------------------------");
    
        }
    
        /**
         * 处理body的业务
         * @param messageExtList
         * @return
         */
        public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList);
    
    }
    
    
    1. 编写消费者
    package utry.config;
    
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.event.ContextRefreshedEvent;
    
    import java.util.List;
    
    /**
     * @author
     * 消息消费者
     */
    @Configuration
    public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {
    
        Logger log = LoggerFactory.getLogger(CustomConsumer.class);
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            try {
                super.listener("t_TopicTest", "Tag1");
            } catch (MQClientException e) {
                log.error("消费者监听器启动失败", e);
            }
    
        }
    
        @Override
        public ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList) {
            log.info("接收到消息");
            for (MessageExt msg : messageExtList) {
                try {
                    String msgStr = new String(msg.getBody(), "utf-8");
                    log.info(msgStr);
                } catch (Exception e) {
                    log.error("body转字符串解析失败");
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
    }
    
    

    编写Controller测试

    package utry.controller;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import utry.config.CustomConsumer;
    
    @RestController
    public class ProducerController {
    
        Logger log = LoggerFactory.getLogger(CustomConsumer.class);
    
        @Autowired
        private DefaultMQProducer producer;
    
    
        @GetMapping("/msg/product")
        public void test(String info) throws Exception {
            Message message = new Message("t_TopicTest", "Tag1", "12345", info.getBytes());
            // 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理
            // 不过要注意的是这个是异步的
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("传输成功");
                    log.info(JSON.toJSONString(sendResult));
                }
    
                @Override
                public void onException(Throwable e) {
                    log.error("传输失败", e);
                }
            });
        }
    
    }
    
    

    github:

    相关文章

      网友评论

          本文标题:SpringBoot集成RocketMQ

          本文链接:https://www.haomeiwen.com/subject/ipugihtx.html