美文网首页JavarabbitMQ
RabbitMQ如何削峰限流

RabbitMQ如何削峰限流

作者: 盼旺 | 来源:发表于2019-09-25 09:38 被阅读0次

    MQ的作用

    1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
    2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
    3)扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
    4)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
    5)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
    6)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
    7)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
    8)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

    举个栗子,秒杀业务:

    用户端发起下单操作
    服务端完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
    用户端下单业务简单,每秒发起了10000个请求,服务端秒杀业务复杂,每秒只能处理2000个请求,很有可能用户端不限速的下单,导致服务端系统被压垮,引发雪崩。
    为了避免雪崩,常见的优化方案有两种:
    1)业务用户端队列缓冲,限速发送
    2)业务服务端队列缓冲,限速执行

    服务端的限速执行

    rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
    basicQos方法在RabbitMQ的Java驱动中对应三个方法:

    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
    
    // prefetchSize = 0
    void basicQos(int prefetchCount, boolean global) throws IOException;
    
    // prefetchSize = 0 , global = false
    void basicQos(int prefetchCount) throws IOException;
    
    prefetchSize:预读取的消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限。
    prefetchCount:预读取的消息数量上限,0表示无上限。
    global:false表示prefetchCount单独应用于信道上的每个新消费者,true表示prefetchCount在同一个信道上的消费者共享。
    

    消费者代码

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    public class Consumer {
        public static void main(String[] args) throws Exception {
    // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
    // 设置 RabbitMQ 地址
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
            // 创建一个新的连接
            Connection connection = connectionFactory.newConnection();
    // 创建一个新的频道
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_qos_exchange";
            String queueName = "test_qos_queue";
            String routingKey = "qos.#";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    // 声明要关注的队列
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            //1 限流方式  第一件事就是 autoAck设置为 false
            channel.basicQos(0,3,false);
     // 不自动回复队列应答 -- RabbitMQ 中的消息确认机制,
            channel.basicConsume(queueName,false,new MyConsumer(channel));
        }
    }
    

    自定义消费者代码

      // DefaultConsumer类 实现了 Consumer 接口, 通过传入一个频道, 告诉服务器我们需要哪个频道的消息
    // 如果频道中有消息, 就会执行回调函数 handleDelivery
    public class MyConsumer extends DefaultConsumer {
        private Channel channel ;
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
    //当我们需要确认一条消息已经被消费时,我们调用的 basicAck 方法的第一个参数是 Delivery Tag。
    //Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
    //以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
    //RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
            channel.basicAck(envelope.getDeliveryTag(), false);
            
        }
    }
    

    生产者代码

    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
    // 创建一个新的连接
            Connection connection = connectionFactory.newConnection();
    // 创建一个新的频道
            Channel channel = connection.createChannel();
            
            String exchange = "test_qos_exchange";
            String routingKey = "qos.save";
           
            String msg = "Hello RabbitMQ QOS Message";
           
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
        }
    }
    

    函数:void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

    exchange 做消息转发用
    routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic
    mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
    immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留

    mandatory标志告诉服务器至少将该消息发送到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
    参考文章https://www.jianshu.com/p/adf0b7de6753

    相关文章

      网友评论

        本文标题:RabbitMQ如何削峰限流

        本文链接:https://www.haomeiwen.com/subject/jfdiuctx.html