美文网首页
rocket mq consumer MQConsumeList

rocket mq consumer MQConsumeList

作者: DUJUNHUI | 来源:发表于2020-07-09 10:47 被阅读0次
package com.djh.consumertest.config.consumer.processor;


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
// MessageListenerConcurrently 并发的处理监听
// MessageListenerOrderly 顺序的处理监听
public class MQConsumeListenerOrderly implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
        // msgs中只收集同一个topic,同一个tag,并且key相同的message
        // 会把不同的消息分别放置到不同的队列中

        // 设置自动提交
        context.setAutoCommit(true);
        for(MessageExt msg : msgs) { // 批量获取消息
            try {
                String topic = msg.getTopic();
                String tags = msg.getTags();
                String keys = msg.getKeys();
                String body = new String(msg.getBody(), "utf-8");
                // 模拟业务逻辑处理...
                log.info("Consumer-获取消息-msgId为{}, 主题topic为={}, 标签tags为={},QueueId为={},消费消息为={}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), body);
                //                    if(keys.equals("testkey1")){
                //                        System.out.println("消息消费失败");
                //                        int i = 1/0;
                //                    }
                //Order order = JSON.parseObject(body, Order.class);
                //String code = order.getOrderId();
            } catch (Exception e) {
                e.printStackTrace();
                //                int reconsumeTimes = msg.getReconsumeTimes();
                //                System.out.println("失败次数reconsumeTimes:"+ reconsumeTimes);
                //                if(reconsumeTimes == 3){
                //                    // 写入日志
                //                    // 做补偿处理
                //                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //                }
                //                System.out.println("消费失败");
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

相关文章

网友评论

      本文标题:rocket mq consumer MQConsumeList

      本文链接:https://www.haomeiwen.com/subject/flsjcktx.html