美文网首页
记RocketMQ的一次使用

记RocketMQ的一次使用

作者: 瓜尔佳_半阙 | 来源:发表于2018-10-26 15:49 被阅读56次

    Producer的代码基本都一样,详情参看官网样例即可。
    这里记录一下Consumer的其中一种消费方式。

    Consumer 配置类

    package com.yeshen.server.initConfig;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
        private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);
    
        @Value("${rocketmq.namesrv.addr}")
        private String namesrvAddr;
    
        @Value("${rocketmq.consumer.topics}")
        private String topics;
    
        @Value("${rocketmq.consumer.threadMax}")
        private String consumeThreadMax;
    
        @Value("${rocketmq.consumer.threadMin}")
        private String consumeThreadMin;
    
        @Value("${rocketmq.consumer.groupName}")
        private String consumerGroupName;
    
        @Autowired
        private MQMessageListenerProcessor mqMessageListenerProcessor;
    
        @Bean
        public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
            if (StringUtils.isEmpty(consumerGroupName)) {
                logger.info("consumer group name is null");
                throw new Exception("consumer group name is null");
            }
            if (StringUtils.isEmpty(namesrvAddr)) {
                logger.info("namesrv address is null");
                throw new Exception("namesrv address is null");
            }
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeThreadMin(Integer.valueOf(consumeThreadMin));
            consumer.setConsumeThreadMax(Integer.valueOf(consumeThreadMax));
            consumer.subscribe("TopicTest", "TagA || TagB || TagC || TagD || TagE");
            consumer.registerMessageListener(mqMessageListenerProcessor);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.start();
            logger.info("--------- consumer start success. ------------");
            return consumer;
        }
    }
    
    

    MessageListener

    package com.yeshen.server.initConfig;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yeshen.server.commons.commonVo.Query;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    
    import java.util.List;
    
    @Component
    public class MQMessageListenerProcessor implements MessageListenerConcurrently {
    
        private static final Logger logger = LoggerFactory.getLogger(MQMessageListenerProcessor.class);
    
        @Value("${rocketmq.consumer.topics}")
        private String topics;
    
        @Autowired
        private UploadDataRepository uploadDataRepository;
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (CollectionUtils.isEmpty(list)) {
                logger.info("message list is null,return success");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            for (MessageExt msg : list) {
                try {
                    String jsonStr = new String(msg.getBody());
                    JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                    JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                    Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                    List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                    Integer vcode = jsonObject.getInteger("vcode");
                    uploadDataRepository.uploadDataToOdps(query, packageList, vcode + "", 1);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    if (msg.getReconsumeTimes() == 3) {
                        String jsonStr = new String(msg.getBody());
                        JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                        JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                        Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                        List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                        Integer vcode = jsonObject.getInteger("vcode");
                        JSONObject logJson = new JSONObject();
                        logJson.put("country", query.getIsoCode().toLowerCase());
                        logJson.put("list", packageList);
                        logJson.put("ip", query.getIp());
                        logJson.put("vcode", vcode);
                        logger.info("not_exists_rule_applist" + logJson.toJSONString());
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:记RocketMQ的一次使用

          本文链接:https://www.haomeiwen.com/subject/iefltqtx.html