美文网首页Spring Boot实践记录
StringBoot集成Rabbit,根据业务返回ACK

StringBoot集成Rabbit,根据业务返回ACK

作者: Chinesszz | 来源:发表于2017-04-26 20:39 被阅读1433次
    为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费
    • 首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)

    • 上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,不太建议使用

    
     @RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
        @RabbitHandler
        public void processUser(String message) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    logger.info("用户侧流水:{}",message);
                }
            });
        }
    
    • 根据源码分析,当然这里不分析源码,有兴趣的可以多失败几次就ok明白了

    • 在配置类中定义监听器,监听这个序列(AcknowledgeMode.MANUAL是必须的哦)

    
        /**
         * 接受消息的监听,这个监听客户交易流水的消息
         * 针对消费者配置
         * @return
         */
        @Bean
        public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queueMessage());
            container.setExposeListenerChannel(true);
            container.setMaxConcurrentConsumers(1);
            container.setConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
            container.setMessageListener(transactionConsume);
            return container;
        }
    
    

    这个 TransactionConsumeImpl 要继承ChannelAwareMessageListener,主要说的手动返回ACK就是channel。调用

    
    @Component
    public class TransactionConsumeImpl implements ChannelAwareMessageListener {
        private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
        private static final Gson gson = new Gson();
    
        @Autowired
        JedisShardInfo jedisShardInfo;
        @Autowired
        ExecutorService threadPool;
        @Autowired
        BoluomeFlowService boluomeFlowService;
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
            threadPool.execute(new Runnable() {   //多线程处理
                @Override
                public void run() {
                    Jedis jedis = jedisShardInfo.createResource();
                    jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
                    BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
                    String json = gson.toJson(flow);
                    if (boluomeFlowService.insert(flow)) {  //当添加成功时候返回成功
                        logger.info("客户交易流水添加1条记录:{}", json);
                        jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
                        } catch (IOException ie) {
                            logger.error("消费成功回调成功,io操作异常");
                        }
                    } else {
                        logger.info("客户交易流水添加失败记录:{}", json);
                    }
                }
            });
        }
    }
    
    

    相关文章

      网友评论

      • Chinesszz:这中对于消息的消费,保证了消息的安全性,不可丢失,但是大部分时间会浪费在数据的io操作上,经测试每秒数据不会超过20,这个完全不能忍,接下来会逐步提高,消息的处理量,看下一篇教程吧

      本文标题:StringBoot集成Rabbit,根据业务返回ACK

      本文链接:https://www.haomeiwen.com/subject/ngewzttx.html