美文网首页rocketmq
RocketMq消息监听程序中消除大量的if..else

RocketMq消息监听程序中消除大量的if..else

作者: 索隆大大 | 来源:发表于2018-03-03 01:25 被阅读1135次

    上一篇:Spring Boot 整合 RocketMq

    RocketMq消息监听程序消除大量的if..else

    承接上一篇文章,如果消费端订阅了多个topic和tag,则需要在消息监听器类中添加if..else,根据topic和tag处理不同的业务逻辑,使得消息监听类职责过重。

    大概思路

    消息监听器类只负责监听消息,获取到消息后通过topic和tag路由到需要调用的服务,消费者只需要编写对应的topic和tag的服务。

    为了监听器类可以通过topic和tag路由到需要调用的服务,自定义一个消费者注解MQConsumeService(该注解包含topic和tag的定义),消费者实现类上添加该注解,然后就可以在监听器类中通过反射获取到实现类上面注解的topic和tag,和监听到的topic和tag比较,如果相同则调用该服务。

    定义MQConsumeService注解

    package com.clouds.common.rocketmq.annotation;
    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    import org.springframework.stereotype.Service;
    
    import com.clouds.common.rocketmq.TopicEnum;
    
    
    /**
     * 此注解用于标注消费者服务
     * .<br/>
     * 
     * Copyright: Copyright (c) 2017  zteits
     * 
     * @ClassName: MQConsumeService
     * @Description: 
     * @version: v1.0.0
     * @author: zhaowg
     * @date: 2018年3月2日 下午1:15:52
     * Modification History:
     * Date             Author          Version            Description
     *---------------------------------------------------------*
     * 2018年3月2日      zhaowg           v1.0.0               创建
     */
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    @Service
    public @interface MQConsumeService {
        /**
         * 消息主题
         */
         TopicEnum topic();
    
        /**
         * 消息标签,如果是该主题下所有的标签,使用“*”
         */
         String[] tags();
    
    
    }
    
    

    定义消费者返回消息Bean

    package com.clouds.common.rocketmq.consumer.processor;
    
    import java.io.Serializable;
    /**
     * 消费结果
     * .<br/>
     * 
     * Copyright: Copyright (c) 2017  zteits
     * 
     * @ClassName: MQConsumeResult
     * @Description: 
     * @version: v1.0.0
     * @author: zhaowg
     * @date: 2018年3月1日 上午11:12:55
     * Modification History:
     * Date             Author          Version            Description
     *---------------------------------------------------------*
     * 2018年3月1日      zhaowg           v1.0.0               创建
     */
    public class MQConsumeResult implements Serializable{
    
        private static final long serialVersionUID = 1L;
        /**
         * 是否处理成功
         */
        private boolean isSuccess;
        /**
         * 如果处理失败,是否允许消息队列继续调用,直到处理成功,默认true
         */
        private boolean isReconsumeLater = true;
        /**
         * 是否需要记录消费日志,默认不记录
         */
        private boolean isSaveConsumeLog = false;
        /**
         * 错误Code
         */
        private String errCode;
        /**
         * 错误消息
         */
        private String errMsg;
        /**
         * 错误堆栈
         */
        private Throwable e;
        
        //省略set和get方法
    }
    

    定义统一的消息处理接口

    package com.clouds.common.rocketmq.consumer.processor;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * 消息队列-消息消费处理接口
     * .<br/>
     * 
     * Copyright: Copyright (c) 2017  zteits
     * 
     * @ClassName: MQMsgProcessorService
     * @Description: 
     * @version: v1.0.0
     * @author: zhaowg
     * @date: 2018年3月1日 上午9:57:57
     * Modification History:
     * Date             Author          Version            Description
     *---------------------------------------------------------*
     * 2018年3月1日      zhaowg           v1.0.0               创建
     */
    public interface MQMsgProcessor {
        /**
         * 消息处理<br/>
         * 如果没有return true ,consumer会重新消费该消息,直到return true<br/>
         * consumer可能重复消费该消息,请在业务端自己做是否重复调用处理,该接口设计为幂等接口
         * @param topic 消息主题
         * @param tag 消息标签
         * @param msgs 消息
         * @return
         * 2018年3月1日 zhaowg
         */
        MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs);
    }
    
    

    定义抽象类实现上面的MQMsgProcessor接口

    package com.clouds.common.rocketmq.consumer.processor;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.rocketmq.common.message.MessageConst;
    import com.alibaba.rocketmq.common.message.MessageExt;
    /**
     * 所有消息处理继承该类
     * .<br/>
     * 
     * Copyright: Copyright (c) 2017  zteits
     * 
     * @ClassName: AbstractMQMsgProcessorService
     * @Description: 
     * @version: v1.0.0
     * @author: zhaowg
     * @date: 2018年3月1日 上午11:14:25
     * Modification History:
     * Date             Author          Version            Description
     *---------------------------------------------------------*
     * 2018年3月1日      zhaowg           v1.0.0               创建
     */
    public abstract class AbstractMQMsgProcessor implements MQMsgProcessor{
        
        protected static final Logger logger = LoggerFactory.getLogger(AbstractMQMsgProcessor.class);
        
        @Override
        public MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs) {
            MQConsumeResult mqConsumeResult = new MQConsumeResult();
            /**可以增加一些其他逻辑*/
            
            for (MessageExt messageExt : msgs) {
                //消费具体的消息,抛出钩子供真正消费该消息的服务调用
                mqConsumeResult = this.consumeMessage(tag,messageExt.getKeys()==null?null:Arrays.asList(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR)),messageExt);
            }
            
            /**可以增加一些其他逻辑*/
            return mqConsumeResult;
        }
        /**
         * 消息某条消息
         * @param tag 标签
         * @param keys 消息关键字
         * @param messageExt
         * @return
         * 2018年3月1日 zhaowg
         */
        protected abstract MQConsumeResult consumeMessage(String tag,List<String> keys, MessageExt messageExt);
    
    }
    
    

    修改后的消费者监听器类

    package com.clouds.common.rocketmq.consumer.processor;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.stream.Collectors;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.clouds.common.rocketmq.annotation.MQConsumeService;
    import com.clouds.common.rocketmq.constants.RocketMQErrorEnum;
    import com.clouds.common.rocketmq.exception.AppException;
    import com.clouds.common.rocketmq.exception.RocketMQException;
    /**
     * 消费者消费消息路由
     * .<br/>
     * 
     * Copyright: Copyright (c) 2017  zteits
     * 
     * @ClassName: RocketMQMessageListenerConcurrentlyProcessor
     * @Description: 
     * @version: v1.0.0
     * @author: zhaowg
     * @date: 2018年2月28日 上午11:12:32
     * Modification History:
     * Date             Author          Version            Description
     *---------------------------------------------------------*
     * 2018年2月28日      zhaowg           v1.0.0               创建
     */
    @Component
    public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
        private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
        @Autowired
        private Map<String,MQMsgProcessor> mqMsgProcessorServiceMap;
        /**
         *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
         *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            if(CollectionUtils.isEmpty(msgs)){
                logger.info("接受到的消息为空,不处理,直接返回成功");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            ConsumeConcurrentlyStatus concurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            try{
                //根据Topic分组
                Map<String, List<MessageExt>> topicGroups = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
                for (Entry<String, List<MessageExt>> topicEntry : topicGroups.entrySet()) {
                    String topic = topicEntry.getKey();
                    //根据tags分组
                    Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().collect(Collectors.groupingBy(MessageExt::getTags));
                    for (Entry<String, List<MessageExt>> tagEntry : tagGroups.entrySet()) {
                        String tag = tagEntry.getKey();
                        //消费某个主题下,tag的消息
                        this.consumeMsgForTag(topic,tag,tagEntry.getValue());
                    }
                }
            }catch(Exception e){
                logger.error("处理消息失败",e);
                concurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            // 如果没有return success ,consumer会重新消费该消息,直到return success
            return concurrentlyStatus;
        }
        /**
         * 根据topic 和 tags路由,查找消费消息服务
         * @param topic
         * @param tag
         * @param value
         * 2018年3月1日 zhaowg
         */
        private void consumeMsgForTag(String topic, String tag, List<MessageExt> value) {
            //根据topic 和  tag查询具体的消费服务
            MQMsgProcessor imqMsgProcessor = selectConsumeService(topic, tag);
            
            try{
                if(imqMsgProcessor==null){
                    logger.error(String.format("根据Topic:%s和Tag:%s 没有找到对应的处理消息的服务",topic,tag));
                    throw new RocketMQException(RocketMQErrorEnum.NOT_FOUND_CONSUMESERVICE);
                }
                logger.info(String.format("根据Topic:%s和Tag:%s 路由到的服务为:%s,开始调用处理消息",topic,tag,imqMsgProcessor.getClass().getName()));
                //调用该类的方法,处理消息
                MQConsumeResult mqConsumeResult = imqMsgProcessor.handle(topic,tag,value);
                if(mqConsumeResult==null){
                    throw new RocketMQException(RocketMQErrorEnum.HANDLE_RESULT_NULL);
                }
                if(mqConsumeResult.isSuccess()){
                    logger.info("消息处理成功:"+JSON.toJSONString(mqConsumeResult));
                }else{
                    throw new RocketMQException(RocketMQErrorEnum.CONSUME_FAIL,JSON.toJSONString(mqConsumeResult),false);
                }
                if(mqConsumeResult.isSaveConsumeLog()){
                    logger.debug("开始记录消费日志");
                    //TODO 记录消费日志
                }
            }catch(Exception e){
                if(e instanceof AppException){
                    AppException mqe = (AppException)e;
                    //TODO 记录消费失败日志
                    throw new AppException(mqe.getErrCode(),mqe.getErrMsg(),false); 
                }else{
                    //TODO 记录消费失败日志
                    throw e;
                }
            }
        }
        /**
         * 根据topic和tag查询对应的具体的消费服务
         * @param topic
         * @param tag
         * @return
         * 2018年3月3日 zhaowg
         */
        private MQMsgProcessor selectConsumeService(String topic, String tag) {
            MQMsgProcessor imqMsgProcessor = null;
            for (Entry<String, MQMsgProcessor> entry : mqMsgProcessorServiceMap.entrySet()) {
                //获取service实现类上注解的topic和tags
                MQConsumeService consumeService = entry.getValue().getClass().getAnnotation(MQConsumeService.class);
                if(consumeService == null){
                    logger.error("消费者服务:"+entry.getValue().getClass().getName()+"上没有添加MQConsumeService注解");
                    continue;
                }
                String annotationTopic = consumeService.topic().getCode();
                if(!annotationTopic.equals(topic)){
                    continue;
                }
                String[] tagsArr = consumeService.tags();
                //"*"号表示订阅该主题下所有的tag
                if(tagsArr[0].equals("*")){
                    //获取该实例
                    imqMsgProcessor = entry.getValue();
                    break;
                }
                boolean isContains = Arrays.asList(tagsArr).contains(tag);
                if(isContains){
                    //获取该实例
                    imqMsgProcessor = entry.getValue();
                    break;
                }
            }
            return imqMsgProcessor;
        }
    
    }
    

    至此,通过topic和tag路由服务已经写完了,现在以新增订阅一个DemoTopic主题为例,编写消息接受方。

    新建消费者类,继承AbstractMQMsgProcessor类

    注意:该类必须继承AbstractMQMsgProcessor,且添加MQConsumeService注解,用于定义该类是针对那个topic和tag的消费服务。新增订阅主题,还需要在application.propertis中修改rocketmq.consumer.topics。

    package com.clouds.common.demo;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.clouds.common.rocketmq.TopicEnum;
    import com.clouds.common.rocketmq.annotation.MQConsumeService;
    import com.clouds.common.rocketmq.consumer.processor.AbstractMQMsgProcessor;
    import com.clouds.common.rocketmq.consumer.processor.MQConsumeResult;
    
    @MQConsumeService(topic=TopicEnum.DemoTopic,tags={"*"})
    public class DemoNewTopicConsumerMsgProcessImpl extends AbstractMQMsgProcessor{
    
        @Override
        protected MQConsumeResult consumeMessage(String tag, List<String> keys, MessageExt messageExt) {
            String msg = new String(messageExt.getBody());
            logger.info("获取到的消息为:"+msg);
            //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
            
            //如果注解中tags数据中包含多个tag或者是全部的tag(*),则需要根据tag判断是那个业务,
            //如果注解中tags为具体的某个tag,则该服务就是单独针对tag处理的
            if(tag.equals("某个tag")){
                //做某个操作
            }
            //TODO 获取该消息重试次数
            int reconsume = messageExt.getReconsumeTimes();
            //根据消息重试次数判断是否需要继续消费
            if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
                
            }
            MQConsumeResult result = new MQConsumeResult();
            result.setSuccess(true);
            return result;
        }
    
    }
    
    
    rocketmq.consumer.topics=DemoTopic~*;
    

    运行测试类观察日志

    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v1.5.2.RELEASE)
    
    2018-03-03 16:54:25.079  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : Starting SpringBootRocketMqApplication on DESKTOP-HS6GR38 with PID 12496 (D:\wsforjava\springboot-rocketmq\target\classes started by suolo in D:\wsforjava\springboot-rocketmq)
    2018-03-03 16:54:25.086  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : No active profile set, falling back to default profiles: default
    2018-03-03 16:54:25.201  INFO 12496 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@236e3f4e: startup date [Sat Mar 03 16:54:25 CST 2018]; root of context hierarchy
    2018-03-03 16:54:27.073  INFO 12496 --- [           main] c.c.c.r.p.MQProducerConfiguration        : producer is start ! groupName:[springboot-rocketmq],namesrvAddr:[47.97.8.22:9876]
    2018-03-03 16:54:27.430  INFO 12496 --- [           main] c.c.c.r.c.MQConsumerConfiguration        : consumer is start !!! groupName:springboot-rocketmq,topics:DemoTopic~*;,namesrvAddr:47.97.8.22:9876
    2018-03-03 16:54:27.747  INFO 12496 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
    2018-03-03 16:54:27.764  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : Started SpringBootRocketMqApplication in 3.213 seconds (JVM running for 3.814)
    2018-03-03 16:54:27.784  INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 根据Topic:DemoTopic和Tag:DemoTag 路由到的服务为:com.clouds.common.demo.DemoNewTopicConsumerMsgProcessImpl,开始调用处理消息
    2018-03-03 16:54:30.162  INFO 12496 --- [MessageThread_1] c.c.c.r.c.p.AbstractMQMsgProcessor       : 获取到的消息为:demo msg test
    2018-03-03 16:54:30.194  INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 消息处理成功:{"reconsumeLater":true,"saveConsumeLog":false,"success":true}
    
    

    项目源码

    https://gitee.com/zhaowg3/springboot-rocketmq/tree/master/

    完成。

    相关文章

      网友评论

        本文标题:RocketMq消息监听程序中消除大量的if..else

        本文链接:https://www.haomeiwen.com/subject/fpozxftx.html