美文网首页Java 杂谈技术干货架构设计
基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处

基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处

作者: Joey_Java | 来源:发表于2018-10-18 12:10 被阅读69次

前言

传统处理超时订单
  • 采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好
  • 当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作
jdk延迟队列 DelayQueue
  • 采取jdk自带的延迟队列能很好的优化传统的处理方案,但是该方案的弊、端也是非常致命的,所有的消息数据都是存于内存之中,一旦宕机或重启服务队列中数据就全无了,而且也无法进行扩展。
  • rabbitMQ延时队列方案
    rabbitmq我就不多介绍了,一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

术语 (详情请参照官网文档:http://www.rabbitmq.com/admin-guide.html

存活时间(Time-To-Live 简称 TTL),分别有三种TTL的设置模式
  • x-message-ttl ,该属性是在创建队列的时候 ,在arguments的map中配置;该参数的作用是设置当前队列中所有的消息的存活时间
  • x-expires 该属性也是在arguments中配置;其作用是设置当前队列在N毫秒中(不能为0,且为正整数),就删除该队列;“未使用”意味着队列没有消费者,队列尚未重新声明,并且至少在有效期内未调用basicGet (basicGet 是手动拉取指定队列中的一条消息)
  • AMQP.BasicProperties配置中的exppiration 属性,前两者都是基于队列的TTL,该属性是基于单条消息的TLL用于配置每条消息在队列中的存活时间
死信交换(Dead Letter Exchanges 简称 DLX)
  • ”死信交换“ 可以分开来理解 ;首先是 ”死信“,也就是死亡的信息,无效的信息;造成这样的信息有以下几种情况
    消息被拒绝,即消费者没有成功确认消息被消费
    消息TTL过期
    超出队列长度限制
    当出现这三种情况的时候,队列中的消息就会变为“死信”

  • 再来理解”交换“ 也就是说,当出现"死信"的情况下 rabbitmq 可以对该"死信"进行交换到别的队列上,但是交换的前提是需要为死信配置一个交换机用于死信的交换

代码实现

配置类 RabbitmqConfiguration
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* TODO rabbitmq配置类
*/  
public class RabbitmqConfiguration { 
   private final String SERVER_HOST="127.0.0.1";//rabbitmq 服务器地址
   private final int PORT=5672;//端口号 
   private final String USER_NAME="test";//用户名
   private final String PASSWORD="test";//密码
   private final boolean QUEUE_SAVE =true;//队列是否持久化
   private final String MESSAGE_SAVE = "1" ;//消息持久化  1,0 
   //rabbitmq 连接工厂
   private final ConnectionFactory RAB_FACTORY = new ConnectionFactory();
   private Connection connection;
 
   public void init() throws Exception{
       RAB_FACTORY.setHost(SERVER_HOST);
       RAB_FACTORY.setPort(PORT); 
       RAB_FACTORY.setUsername(USER_NAME);
       RAB_FACTORY.setPassword(PASSWORD);
       this.connection=RAB_FACTORY.newConnection();
   } 
   public Connection getConnection() {
       return connection;
   }

   public void setConnection(Connection connection) {
       this.connection = connection;
   }

   public boolean isQUEUE_SAVE() {
       return QUEUE_SAVE;
   }

   public String getMESSAGE_SAVE() {
       return MESSAGE_SAVE;
   }
       
}
功能类 OrderOverTimeQueue
import java.io.IOException; 
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * TODO 超时未支付订单处理消息队列
 */
public class OrderOverTimeQueue {
    
    private RabbitmqConfiguration rabConf;
      
    //队列名称
    //****==================订单延时队列=======================*****//
    //订单延时队列
    public final String DELAY_QUEUE_NAME = "delay-queue-orderOverTime";
    //订单延时消费队列
    public final String CONSUME_QUEUE_NAME = "consume-queue-orderOverTime";
    //订单延时队列死信交换的交换器名称
    public final String EXCHANGENAME = "exchange-orderOverTime";
    //订单延时队列死信的交换器路由key
    public final String ROUTINGKEY = "routingKey-orderOverTime";
    
    private Channel delayChannel;//延时队列连接通道
    
    private Channel consumerChannel;//消费队列连接通道
    
    public void init() throws Exception{
        //创建连接通道
        delayChannel=rabConf.getConnection().createChannel();  
        consumerChannel=rabConf.getConnection().createChannel();  
        
        //创建交换器
        consumerChannel.exchangeDeclare(EXCHANGENAME,"direct"); 
        
        /**创建处理延时消息的延时队列*/
        Map <String,Object> arg = new HashMap <String,Object>();
        //配置死信交换器
        arg.put("x-dead-letter-exchange",EXCHANGENAME); //交换器名称
        //死信交换路由key (交换器可以将死信交换到很多个其他的消费队列,可以用不同的路由key 来将死信路由到不同的消费队列去)
        arg.put("x-dead-letter-routing-key", ROUTINGKEY); 
        delayChannel.queueDeclare(DELAY_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, arg);  
        
        /**创建消费队列*/
        consumerChannel.queueDeclare(CONSUME_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, null);
        //参数1:绑定的队列名  参数2:绑定至哪个交换器  参数3:绑定路由key
        consumerChannel.queueBind(CONSUME_QUEUE_NAME, EXCHANGENAME,ROUTINGKEY); 
        //最多接受条数 0为无限制,每次消费消息数(根据实际场景设置),true=作用于整channel,false=作用于具体的消费者
        consumerChannel.basicQos(0,10, false);
        
        //创建消费队列的消费者
        Consumer consumer = new DefaultConsumer(consumerChannel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) 
              throws IOException  {
                String message = new String(body, "UTF-8");
                try {
                    //业务逻辑处理
                    ConsumeMessage(message);  
                    //确认消息已经消费  参数2(true=设置后续消息为自动确认消费  false=为手动确认)
                    consumerChannel.basicAck(envelope.getDeliveryTag(), false);
                }catch (Exception e) { 
                }  
              }
            }; 
                
        boolean flag=false;//是否手动确认消息  true 是  false否 
        consumerChannel.basicConsume(CONSUME_QUEUE_NAME, flag, consumer);
    }
 
    /**
     * 方法描述: 发送延迟订单处理消息
     * @param msg 消息内容 (订单号或者json格式字符串)
     * @param overTime 消息存活时间
     * @throws Exception
     */
    public void sendMessage(String msg,Long overTime) throws Exception{ 
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration(overTime.toString()) //设置消息存活时间(毫秒)
                .build();
        delayChannel.basicPublish("",DELAY_QUEUE_NAME, properties, msg.getBytes("UTF-8"));
    }
    
    /**
     * 
     * 方法描述: 
     * 业务逻辑说明: TODO(总结性的归纳方法业务逻辑) 
     * @param msg 消费消息(订单号,或特定格式json字符串)
     * @throws InterruptedException
     */
    public void ConsumeMessage(String msg) throws InterruptedException { 
        Thread.sleep(50);//模拟业务逻辑处理
        System.out.println("处理到期消息时间=="+System.currentTimeMillis());
        System.err.println("删除订单 order-number  ==  "+msg);
    }
    
    
    public RabbitmqConfiguration getRabConf() {
        return rabConf;
    }
 
 
    public void setRabConf(RabbitmqConfiguration rabConf) {
        this.rabConf = rabConf;
    }
    
    public static void main(String[] args) throws Exception {
        OrderOverTimeQueue ooto=new OrderOverTimeQueue();
        RabbitmqConfiguration rf= new RabbitmqConfiguration();
        rf.init();
        ooto.setRabConf(rf);
        ooto.init();
 
        //模拟用户产生订单 消息生存时长为30秒
        ooto.sendMessage("20180907-order-number", 10000l);
        
        System.out.println("创建消息时间=="+System.currentTimeMillis());
        
        
        
    }
    
    
}

最终效果
20180909212141792.png
如果消息还存活的话,在延迟队列中的“ready”和“total”中都会存在相应的消息记录数
20180909211433631.png

写的比较粗糙 欢迎大家发表自己的观点 >_< !

相关文章

  • 基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处

    前言 传统处理超时订单 采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大...

  • rabbitmq延时队列

    延时队列在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付...

  • RabbitMQ消息过期时间设置多值最佳实践

    前言 电商平台里订单支付超时逻辑使用了 RabbitMQ的消息生存时间特性(Time to Live),如果订单设...

  • (3)zset实现延时队列(2)

    1、场景: 订单超时未支付,取消订单,恢复库存 创建的订单加入redis延时队列,单独线程轮循处理过期订单,如已支...

  • 使用Spring Cloud Stream玩转RabbitMQ,

    前一章我们讲了《SpringBoot RabbitMQ消息队列的重试、超时、延时、死信队列[https://my....

  • rabbitMQ-延时队列

    延时队列我们可以简单粗暴的理解它为延时发送消息的队列 那延时队列的应用场景有哪些呢,比如订单在一段时间内未支付则取...

  • MQ 入门实践

    MQ Message Queue,消息队列,FIFO 结构。 例如电商平台,在用户支付订单后执行对应的操作; 优点...

  • 基于Rabbitmq实现延迟队列

    转自 基于Rabbitmq实现延迟队列 基于Rabbitmq实现延迟队列 延迟队列的使用场景 淘宝订单业务:下单后...

  • spring boot 集成rabbitmq 实现延迟队列

    rabbitmq 实现延迟队列 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息...

  • RabbitMQ 集群原理和完善

    一、RabbitMQ集群方案的原理 RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang...

网友评论

    本文标题:基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处

    本文链接:https://www.haomeiwen.com/subject/yimjzftx.html