美文网首页
rabbitmq限流模型实例

rabbitmq限流模型实例

作者: Ukuleler | 来源:发表于2019-02-21 11:38 被阅读0次

    业务需求是通过rabbitmq进行限流,google一堆资源也没找到满意的。
    最后找到一个留作记录模型使用的生产者消费者模式。代码如下
    RabbitMQConf 生产者配置文件

    @Configuration
    public class RabbitMQConf {
        public static final String queueName = "chdeck.phone.number";
    
        @Bean
        public Queue helloQueue() {
            return new Queue(queueName);
        }
    }
    

    Producer

    @Component
    public class Producer {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void send() {
            for (int i = 0; i < 500; i++) {
                System.out.println("发了"+i+"条消息");
                rabbitTemplate.convertAndSend(RabbitMQConf.queueName, "hello" + i);
            }
    
        }
    }
    

    生产者很简单,无需多言,一次产生500条消息。下面看消费者
    消费者的坑很多,找了很多资料最后成功的这个
    consumer

    @Component
    public class Consumer {
        private static final String queueName = "chdeck.phone.number";
        private static final int BATCH_SIZE = 100;
        Channel channel;
    
        @PostConstruct
        public void init() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*******");
            factory.setPort(*****);
            factory.setUsername("*****");
            factory.setPassword("******");
            factory.setVirtualHost("/");
            Connection conn = factory.newConnection();
            channel = conn.createChannel();
        }
    
        public void receive() throws IOException {
            String queue001 = "chdeck.phone.number";
    //        channel.exchangeDeclare(exchange001, "direct", true, false, null);
            channel.queueDeclare(queue001, true, false, false, null);
            //        设置限流策略
            //        channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);
            channel.basicQos(0, 100, false);
            //        自定义消费者
            Executer myExecuter = new Executer(channel);
            //        进行消费,签收模式一定要为手动签收
            channel.basicConsume(queue001, false, myExecuter);
        }
    }
    

    Executer

    public class Executer extends DefaultConsumer {
        private Channel channel;
         public Executer(Channel channel) {
             super(channel);
             // TODO Auto-generated constructor stub
             this.channel=channel;
         }
    
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                 throws IOException {
             System.out.println("body:==="+new String(body)+" t:" + System.currentTimeMillis());
     //        手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息
             channel.basicAck(envelope.getDeliveryTag(), false);
             try {
                //模拟程序执行时间
                 Thread.sleep(100);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
    }
    

    注意如果想要启动消费者进行消费,切记不可使用单元测试。因为单元测试有固定执行时间,只会消费几条消息就会自动停止。正确方法应该是通过条件注解,@Conditional***。来执行,实在不行写一个controller手动执行也可以,只需要启动一次即可。

    相关文章

      网友评论

          本文标题:rabbitmq限流模型实例

          本文链接:https://www.haomeiwen.com/subject/lyhryqtx.html