美文网首页
rabbitmq消息100%投递成功方案(一)

rabbitmq消息100%投递成功方案(一)

作者: 寂静的春天1988 | 来源:发表于2019-06-04 15:03 被阅读0次

    参考视频:https://www.imooc.com/video/17854
    实现思路:
    一个order表和一个记录消息的表。
    每次新增一条订单时新增一条订单记录和一条消息记录,设置初始的重试状态为0次,投递状态为投递中,
    写一个定时器每10秒钟扫描,将过期的消息并且投递状态为投递中的重新投递,超过三次投递的将投递状
    态设置为投递失败,人为的去处理该消息。
    利用rabbitmq的confirm消息确认机制。每次ack成功则将borker_msg的投递状态改为投递成功

    个人疑问:
    这样的设计方案并没有感觉什么必要,不如写一个定时器定时扫描未处理的订单,执行方法去修改而已,这样可以防止未处理的订单。说到底,投递消息也不过是要处理订单而已。

    以下是实现代码:

    sql文件

    CREATE TABLE orders (
        id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '订单id',
        msg VARCHAR(100)
    )
    CREATE TABLE borker_msg(
        msg_id VARCHAR(64) COMMENT '消息id',
        order_id BIGINT  COMMENT '订单id',
        retry int COMMENT '重试次数',
        next_retry_time datetime COMMENT '下次重试时间',
        send_status int DEFAULT 0 COMMENT '投递状态 0:投递中,1:投递成功,2:投递失败'    
    )
    

    rabbitconfig文件

    package com.ganlong.config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @title rabbitmq配置类
     * @author yl
     * @time 2019年5月9日
     * @Description
     */
    @Configuration
    public class RabbitConfig {
        // 创建一个队列
        @Bean
        public Queue demoQueue() {
            // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
            return new Queue("demo_queue", true);
        }
    
        /**
         * 交换机
         * 
         * @return
         */
        @Bean
        public DirectExchange demoExchange() {
            return new DirectExchange("demo_exchange");
        }
    
        /**
         * 将交换机和队列绑定
         * 
         * @return
         */
        @Bean
        public Binding bindingNotify() {
            return BindingBuilder.bind(demoQueue()).to(demoExchange()).with("key");
        }
    }
    

    消费者

    @Component
    public class Receiver {
        
        @RabbitListener(queues = "demo_queue")
        public void get(Message message, Channel channel) throws IOException {
            String msg=new String(message.getBody(),"utf-8");
            System.out.println(message.getBody().toString());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    

    发送消息方法

    package com.ganlong.config;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.ganlong.meta.DO.BorkerMsg;
    import com.ganlong.repo.BrokerMsgRepo;
    @Component
    public class SendMsg {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Autowired
        private BrokerMsgRepo brokerMsgRepo;
        public void sendDelayMessageByPlugins(Integer orderId,String msg){
            //绑定异步监听回调函数
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
               if (ack) {
                  //修改状态为已投递
                   BorkerMsg borkerMsg=brokerMsgRepo.findByOrderId(orderId);
                   borkerMsg.setSendStatus(1);
                   brokerMsgRepo.save(borkerMsg);
                   System.out.println("投递成功");
               } else {
                   System.out.println("投递失败");
               }
           });
            CorrelationData correlationData=new CorrelationData();
            correlationData.setId(orderId+"");
            rabbitTemplate.convertAndSend("demo_exchange", "key", msg, correlationData);
        }
    }
    

    BrokerMsgRepo

    package com.ganlong.repo;
    
    import java.util.List;
    
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.data.jpa.repository.Query;
    import org.springframework.stereotype.Repository;
    
    import com.ganlong.meta.DO.BorkerMsg;
    @Repository
    public interface BrokerMsgRepo extends JpaRepository<BorkerMsg, String>{
        BorkerMsg findByOrderId(Integer orderId);
        @Query(value="SELECT * FROM `borker_msg` WHERE borker_msg.`send_status`=0 AND `next_retry_time` <NOW() ",nativeQuery=true)
        List<BorkerMsg> findByNextRetryTime();
    }
    
    

    OrderRepo

    package com.ganlong.repo;
    
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.stereotype.Repository;
    
    import com.ganlong.meta.DO.OrdersDO;
    
    @Repository
    public interface OrderRepo extends JpaRepository<OrdersDO,Integer>{
        
    }
    

    orderService

    @Service
    @Transactional(rollbackOn= {Exception.class})
    public class OrderService {
        @Autowired
        private OrderRepo orderRepo;
        @Autowired
        private BrokerMsgRepo brokerMsgRepo;
        @Autowired
        private SendMsg sendMsg;
        public String createOrder() {
            OrdersDO ordersDO=new OrdersDO();
            ordersDO.setMsg("消息");
            OrdersDO order=orderRepo.save(ordersDO);
            BorkerMsg borkerMsg=new BorkerMsg();
            borkerMsg.setMsgId(UUID.randomUUID().toString());
            borkerMsg.setOrderId(order.getId());
            borkerMsg.setRetry(0);
            borkerMsg.setSendStatus(0);
            borkerMsg.setNextRetryTime(LocalDateTime.now().plusMinutes(1L));
            System.out.println("超时="+borkerMsg.getNextRetryTime());
            brokerMsgRepo.save(borkerMsg);
            sendMsg.sendDelayMessageByPlugins(order.getId(), "消息");
            return "创建成功";
        }
    }
    

    timer定时器

    package com.ganlong.repo;
    
    import java.time.LocalDateTime;
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    
    import com.ganlong.config.SendMsg;
    import com.ganlong.meta.DO.BorkerMsg;
    /**
      * 定时任务:每10秒钟扫描未投递成功的
     */
    @Configuration
    @EnableScheduling
    public class Timer {
        @Autowired
        private BrokerMsgRepo brokerMsgRepo;
        @Scheduled(cron = "*/10 * * * * ?")
        public void reSend() {
            System.out.println("定时任务执行");
            //将正在投递的,且超时了的消息取出。
            List<BorkerMsg> timeOutMsgList=brokerMsgRepo.findByNextRetryTime();
            for (BorkerMsg borkerMsg : timeOutMsgList) {
                if(borkerMsg.getRetry()<3) {
                    //重新投递
                    System.out.println("borkerMsg="+borkerMsg.getOrderId());
                    new SendMsg().sendDelayMessageByPlugins(borkerMsg.getOrderId(), "消息");
                    //投递次数增加1,超时时间往后推一分钟
                    borkerMsg.setRetry(borkerMsg.getRetry()+1);
                    borkerMsg.setNextRetryTime(LocalDateTime.now().plusMinutes(1));
                }else {
                    borkerMsg.setSendStatus(2);
                }
                brokerMsgRepo.save(borkerMsg);
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:rabbitmq消息100%投递成功方案(一)

          本文链接:https://www.haomeiwen.com/subject/rkmotctx.html