美文网首页
实战代码(七):Springboot集成RocketMQ

实战代码(七):Springboot集成RocketMQ

作者: LY丶Smile | 来源:发表于2020-12-10 21:51 被阅读0次

    一、理论基础

    1.1 RocketMQ能用来做什么

    消息通讯

    消息通讯是最基本也是最为简单的应用。比较典型的一个应用场景就是没有公网IP的情况下,外界服务无法访问接口,可以使用消息队列来订阅事件来实现双向通信。

    异步处理

    对于处理频繁且不需要即时反馈的场景来讲,RocketMQ具备良好的性能,而且比较优秀的消息堆积处理能力对于异步操作来说也是加分项。

    其余功能

    比如流量削峰、应用解耦等,具体可看下网上对于该功能的详细讲解,本文不做深入。

    1.2 基础概念

    • Topic:主题,一级消息类型,可以配合Tag使用做细致区分,不同类型的消息设置不同Topic

    • Tag:消息标签,二级消息类型,用于进一步区分某个Topic下的消息分类

    • Producer:生产者,发送消息

    • Consumer:消费者,一个消息可以被多个消费者订阅

    • Consumer Group:消费者分组,为了实现集群消费,不同Consumer Group之间消费进度彼此不受影响,一个Consumer Group下包含多个Consumer实例

    • Producer Group:生产者分组,标识发送同一类消息的Producer,通常发送逻辑一致,一个Producer Group可以发送多个Topic消息

    1.3 简单说明

    GitHub上有一个开源的RocketMQ工具:RocketMQ-Spring
    感兴趣的可以研究一下,功能实现很完整。

    二、实战代码

    2.1 依赖引入

    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.7.0</version>
    </dependency>
    
    <!-- 自定义的元数据依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    

    2.2 配置项

    rocketmq:
      # rocketmqClient日志路径,默认是系统登录用户的根目录
      producer:
        clientLogDir: logs/rocketmq_client
        # 日志级别
        clientLogLevel: WARN
        namesrvAddr: 127.0.0.1:9876
        groupName: test
        retryTimesWhenSendAsyncFailed: 1
        sendMsgTimeout: 6000
        brokerName: broker-a
      consumer:
        # 日志级别
        clientLogLevel: WARN
        namesrvAddr: 127.0.0.1:9876
        groupName: test
        threadMax: 20
        threadMin: 10
    

    备注:对于服务器硬盘不大的机器来讲,一定要记得设置RocketMQ的日志级别和路径等,否则增长极快的日志文件很快就会将你的硬盘塞满。而且如果没有后续的日志搜集与分析需求,很多日志没必要打印。

    2.3 配置文件读取

    2.3.1 producer

    /**
     * RocketMQ Producer 配置项
     * @author smile
     */
    @Component
    @ConfigurationProperties(prefix = "rocketmq.producer")
    @Data
    public class ProducerProperties {
    
        private String clientLogDir;
        private String clientLogLevel;
        private String namesrvAddr;
        private String groupName;
        private int retryTimesWhenSendAsyncFailed;
        private int sendMsgTimeout;
    
    }
    

    2.3.2 consumer

    /**
     * RocketMQ consumer配置项
     * @author smile
     */
    @Component
    @ConfigurationProperties(prefix = "rocketmq.consumer")
    @Data
    public class ConsumerProperties {
    
        private String namesrvAddr;
        private int threadMax;
        private int threadMin;
        private String groupName;
        private String clientLogDir;
        private String clientLogLevel;
    }
    

    2.4 producer初始化

    /**
     * 程序启动时初始化Producer
     * @author smile
     */
    @Configuration
    @Slf4j
    public class ProducerConfig {
    
        private final ProducerProperties properties;
    
        public ProducerConfig(ProducerProperties properties) {
            this.properties = properties;
        }
    
        @Bean
        public DefaultMQProducer getRocketMQProducer() throws MQClientException {
            setClientProperty();
            DefaultMQProducer producer = new DefaultMQProducer(properties.getGroupName());
            producer.setNamesrvAddr(properties.getNamesrvAddr());
            producer.setRetryTimesWhenSendAsyncFailed(properties.getRetryTimesWhenSendAsyncFailed());
            producer.setSendMsgTimeout(properties.getSendMsgTimeout());
            producer.start();
            log.info("*** producer has started! groupName:[{}], namesrvAddr:[{}] ***", properties.getGroupName(), properties.getNamesrvAddr());
            return producer;
        }
    
        private void setClientProperty() {
            System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());
            System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());
        }
    }
    

    2.5 consumer示例代码

    2.5.1 Consumer管理程序

    1. 本实例基于一个Group的Consumer。简单测试过,未做生产环境的深度测试
    2. 可实现简单的Consumer初始化、新增订阅、取消订阅功能
    /**
     * @author smile
     */
    @Slf4j
    public class ConsumerManager {
    
        private static final ConsumerManager MANAGER = new ConsumerManager();
        private static final String TAGS_SEP = "||";
        private static Map<String, String> subscription = new HashMap<String, String>(8) {
            {
                // 系统消息:订阅与取消订阅的事件
                put("sys", "subscribe||unsubscribe");
            }
        };
    
        private static DefaultMQPushConsumer consumer;
    
    
        /**
         * 单例,不允许外界主动实例化
         */
        private ConsumerManager() {
        }
    
        public static ConsumerManager getInstance() {
            return MANAGER;
        }
    
        /**
         * 初始化Consumer,本示例初始化一个ConsumerGroup
         * 后续所有的订阅与取消订阅都是在一个consumer实例下进行
         */
        public void initConsumer(ConsumerProperties properties) throws MQClientException {
            // 设置client日志信息, producer初始化时已配置,此处不再配置
    //        System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());
    //        System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());
    
            consumer = new DefaultMQPushConsumer(properties.getGroupName());
            consumer.setNamesrvAddr(properties.getNamesrvAddr());
            consumer.setConsumeThreadMax(properties.getThreadMax());
            consumer.setConsumeThreadMin(properties.getThreadMin());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            log.info("consumer topic and tags : {}", subscription);
            for (Map.Entry<String, String> entry : subscription.entrySet()) {
                consumer.subscribe(entry.getKey(), entry.getValue());
            }
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    MessageExt msg = msgs.get(0);
                    String msgBody = new String(msg.getBody(), "utf-8");
                    log.info("receive message, messageId:[{}], messageBody:{}, topic:[{}], tag:[{}]",
                            msg.getMsgId(), msgBody, msg.getTopic(), msg.getTags());
                    log.info("delay: [{}] ms", (System.currentTimeMillis() - msg.getBornTimestamp()));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    // 如果执行异常,则稍后会重新消费
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
            log.info("consumer has started! NamesrvAddr:[{}], groupName:[{}]", properties.getNamesrvAddr(), properties.getGroupName());
        }
    
    
        /**
         * 订阅新的事件
         * @param topic topic
         * @param tags tags,支持多个tag订阅,格式:TagA||TagB
         * @throws MQClientException
         */
        public void subscribe(String topic, String tags) throws MQClientException {
            if (subscription.containsKey(topic)) {
                tags = StringUtils.join(subscription.get(topic), TAGS_SEP, tags);
            }
            consumer.subscribe(topic, tags);
            subscription.put(topic, tags);
            log.info("!!刷新订阅, {}", subscription);
        }
    
        /**
         * 取消订阅
         */
        public void unsubscribe(String topic, String tags) throws MQClientException {
            if (!subscription.containsKey(topic)) {
                log.error("topic is not found");
                return;
            }
            String[] unsubscribeTags = tags.trim().split("\\|\\|");
            String[] existingTags = subscription.get(topic).trim().split("\\|\\|");
            log.info("unsubscribeTags: {}, existingTags: {}", unsubscribeTags, existingTags);
            StringBuilder newTagsBuilder = new StringBuilder();
            for (String existingTag : existingTags) {
                if (!ArrayUtils.contains(unsubscribeTags, existingTag)) {
                    newTagsBuilder.append(existingTag).append(TAGS_SEP);
                }
            }
    
            if (tags.length() == 0) {
                consumer.unsubscribe(topic);
                return;
            }
    
            String newTags = newTagsBuilder.substring(0, newTagsBuilder.length() - 2);
            log.info("newTags: {}", newTagsBuilder);
            consumer.subscribe(topic, newTags);
            subscription.put(topic, newTags);
            log.info("!!取消订阅,新的订阅列表, {}", subscription);
        }
    
    }
    

    2.5.2 初始化Consumer

    /**
     * @author smile
     */
    @Component
    public class ConsumerInit implements CommandLineRunner {
    
        private final ConsumerProperties properties;
    
        public ConsumerInit(ConsumerProperties properties) {
            this.properties = properties;
        }
    
        @Override
        public void run(String... args) throws Exception {
            ConsumerManager.getInstance().initConsumer(properties);
        }
    }
    

    三、源码地址

    完整示例代码,请移步GitHub

    相关文章

      网友评论

          本文标题:实战代码(七):Springboot集成RocketMQ

          本文链接:https://www.haomeiwen.com/subject/baykgktx.html