美文网首页rabbitMQ
RabbitMQ学习(五)消费端削峰限流

RabbitMQ学习(五)消费端削峰限流

作者: kobe0429 | 来源:发表于2018-11-28 11:12 被阅读0次

    1.MQ的作用

    1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
    2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
    扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
    3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
    4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
    5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
    6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
    7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

    本文只讨论削峰填谷的应用场景:

    举个业务场景的栗子,秒杀业务:
    上游发起下单操作
    下游完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
    上游下单业务简单,每秒发起了10000个请求,下游秒杀业务复杂,每秒只能处理2000个请求,很有可能上游不限速的下单,导致下游系统被压垮,引发雪崩。
    为了避免雪崩,常见的优化方案有两种:
    1)业务上游队列缓冲,限速发送
    2)业务下游队列缓冲,限速执行

    本文只讨论下游队列,就是消费端的限速执行

    rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
    使用 basicqos方法:
    在消费端进行使用。 0 1 false
    prefetSize:0
    prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
    global: true是channel级别, false是消费者级别
    注意:我们要使用非自动ack
    消费者代码:

    package com.bfxy.rabbitmq.api.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.136.197.244");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            
            String exchangeName = "test_qos_exchange";
            String queueName = "test_qos_queue";
            String routingKey = "qos.#";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //1 限流方式  第一件事就是 autoAck设置为 false
            
            channel.basicQos(0,3,false);
            channel.basicConsume(queueName,false,new MyConsumer(channel));
        }
    }
    

    自定义消费者代码:

    package com.bfxy.rabbitmq.api.limit;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class MyConsumer extends DefaultConsumer {
    
    
        private Channel channel ;
        
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
            
            channel.basicAck(envelope.getDeliveryTag(), false);
            
        }
    
    
    }
    

    生产者代码:

    package com.bfxy.rabbitmq.api.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.136.197.244");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
    
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_qos_exchange";
            String routingKey = "qos.save";
            
            String msg = "Hello RabbitMQ QOS Message";
            
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
            
        }
    }
    

    调试步骤:
    1)启动消费者类,效果如图:

    消费者启动mq交换机信息.JPG
    消费者启动mq队列信息.JPG
    2)在自定义消费者类中注释掉channel.basicAck(envelope.getDeliveryTag(), false);
    启动生产者类,mq管控台信息
    管控台信息.JPG
    可以看到1个待确认的,4个准备好的消息,
    3)放开代码channel.basicAck(envelope.getDeliveryTag(), false);
    启动生产者类,mq管控台信息
    管控台信息.JPG
    总结:消费者消费成功一个消息后,需要设置成手动确认,当返回确认成功后,再去消费下一个消息,这样可以实现消费端的削峰限流,不至于让消费端服务崩溃。
    到这里是不是以为结束了呢,其实还有一个知识点,就是消费端对没有消费成功的消息,可以不进行确认,让其重回队列,再次消费,与上面的代码相比,只需修改自定义的消费者,设置如果满足我们自己设置的条件就认为是没有消费成功,让其重回队列,这个时候broker端会再此发出这条消息。
    修改如下:
    重回队列.JPG
    启动生产者和消费者,消费者控制台信息如下:
    重复消费未确认的消息.JPG

    相关文章

      网友评论

        本文标题:RabbitMQ学习(五)消费端削峰限流

        本文链接:https://www.haomeiwen.com/subject/wicaqqtx.html