美文网首页
记RocketMQ的一次使用

记RocketMQ的一次使用

作者: 瓜尔佳_半阙 | 来源:发表于2018-10-26 15:49 被阅读56次

Producer的代码基本都一样,详情参看官网样例即可。
这里记录一下Consumer的其中一种消费方式。

Consumer 配置类

package com.yeshen.server.initConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);

    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.topics}")
    private String topics;

    @Value("${rocketmq.consumer.threadMax}")
    private String consumeThreadMax;

    @Value("${rocketmq.consumer.threadMin}")
    private String consumeThreadMin;

    @Value("${rocketmq.consumer.groupName}")
    private String consumerGroupName;

    @Autowired
    private MQMessageListenerProcessor mqMessageListenerProcessor;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
        if (StringUtils.isEmpty(consumerGroupName)) {
            logger.info("consumer group name is null");
            throw new Exception("consumer group name is null");
        }
        if (StringUtils.isEmpty(namesrvAddr)) {
            logger.info("namesrv address is null");
            throw new Exception("namesrv address is null");
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(Integer.valueOf(consumeThreadMin));
        consumer.setConsumeThreadMax(Integer.valueOf(consumeThreadMax));
        consumer.subscribe("TopicTest", "TagA || TagB || TagC || TagD || TagE");
        consumer.registerMessageListener(mqMessageListenerProcessor);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.start();
        logger.info("--------- consumer start success. ------------");
        return consumer;
    }
}

MessageListener

package com.yeshen.server.initConfig;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeshen.server.commons.commonVo.Query;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

@Component
public class MQMessageListenerProcessor implements MessageListenerConcurrently {

    private static final Logger logger = LoggerFactory.getLogger(MQMessageListenerProcessor.class);

    @Value("${rocketmq.consumer.topics}")
    private String topics;

    @Autowired
    private UploadDataRepository uploadDataRepository;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            logger.info("message list is null,return success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt msg : list) {
            try {
                String jsonStr = new String(msg.getBody());
                JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                Integer vcode = jsonObject.getInteger("vcode");
                uploadDataRepository.uploadDataToOdps(query, packageList, vcode + "", 1);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                if (msg.getReconsumeTimes() == 3) {
                    String jsonStr = new String(msg.getBody());
                    JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                    JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                    Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                    List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                    Integer vcode = jsonObject.getInteger("vcode");
                    JSONObject logJson = new JSONObject();
                    logJson.put("country", query.getIsoCode().toLowerCase());
                    logJson.put("list", packageList);
                    logJson.put("ip", query.getIp());
                    logJson.put("vcode", vcode);
                    logger.info("not_exists_rule_applist" + logJson.toJSONString());
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

相关文章

  • 记RocketMQ的一次使用

    Producer的代码基本都一样,详情参看官网样例即可。这里记录一下Consumer的其中一种消费方式。 Cons...

  • 2020-07-15 搭建 RocketMQ 服务器最便捷的方法

    最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这...

  • 史上最便捷搭建RocketMQ服务器的方法

    最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这...

  • canal和rocketmq

    canal使用rocketmq做数据同步的问题 因为要保证数据的顺序,所以要使用rocketmq的顺序topic,...

  • 消息队列

    RocketMQ使用/源码分析 http://jm.taobao.org/2017/01/12/rocketmq-...

  • rocketmq启动

    本篇文章主要讲述rocketmq的控制台安装、rocketmq的启动、使用源码测试一、启动rocketmq参考ht...

  • Apache RocketMQ之JMS基本概念及使用

    Apache RocketMQ之JMS基本概念及使用 Apache RocketMQ 系列: Apache Roc...

  • 使用RocketMQ消费消息

    RocketMQ消费端 今天要来跟大家学习怎样使用RocketMQ来进行消息的消费 先简单创建个Maven项目使用...

  • RocketMQ 顺序消息

    背景: 业务使用 RocketMQ 的场景增多,但是有一些消息状态依赖的场景没有考虑顺序正确的使用RocketMQ...

  • rocketmq使用

    应用 rocket-mq有四个概念 consumer producer name-server broker 如果...

网友评论

      本文标题:记RocketMQ的一次使用

      本文链接:https://www.haomeiwen.com/subject/iefltqtx.html