美文网首页
rabbitmq限流模型实例

rabbitmq限流模型实例

作者: Ukuleler | 来源:发表于2019-02-21 11:38 被阅读0次

业务需求是通过rabbitmq进行限流,google一堆资源也没找到满意的。
最后找到一个留作记录模型使用的生产者消费者模式。代码如下
RabbitMQConf 生产者配置文件

@Configuration
public class RabbitMQConf {
    public static final String queueName = "chdeck.phone.number";

    @Bean
    public Queue helloQueue() {
        return new Queue(queueName);
    }
}

Producer

@Component
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send() {
        for (int i = 0; i < 500; i++) {
            System.out.println("发了"+i+"条消息");
            rabbitTemplate.convertAndSend(RabbitMQConf.queueName, "hello" + i);
        }

    }
}

生产者很简单,无需多言,一次产生500条消息。下面看消费者
消费者的坑很多,找了很多资料最后成功的这个
consumer

@Component
public class Consumer {
    private static final String queueName = "chdeck.phone.number";
    private static final int BATCH_SIZE = 100;
    Channel channel;

    @PostConstruct
    public void init() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*******");
        factory.setPort(*****);
        factory.setUsername("*****");
        factory.setPassword("******");
        factory.setVirtualHost("/");
        Connection conn = factory.newConnection();
        channel = conn.createChannel();
    }

    public void receive() throws IOException {
        String queue001 = "chdeck.phone.number";
//        channel.exchangeDeclare(exchange001, "direct", true, false, null);
        channel.queueDeclare(queue001, true, false, false, null);
        //        设置限流策略
        //        channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);
        channel.basicQos(0, 100, false);
        //        自定义消费者
        Executer myExecuter = new Executer(channel);
        //        进行消费,签收模式一定要为手动签收
        channel.basicConsume(queue001, false, myExecuter);
    }
}

Executer

public class Executer extends DefaultConsumer {
    private Channel channel;
     public Executer(Channel channel) {
         super(channel);
         // TODO Auto-generated constructor stub
         this.channel=channel;
     }

     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
             throws IOException {
         System.out.println("body:==="+new String(body)+" t:" + System.currentTimeMillis());
 //        手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息
         channel.basicAck(envelope.getDeliveryTag(), false);
         try {
            //模拟程序执行时间
             Thread.sleep(100);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
}

注意如果想要启动消费者进行消费,切记不可使用单元测试。因为单元测试有固定执行时间,只会消费几条消息就会自动停止。正确方法应该是通过条件注解,@Conditional***。来执行,实在不行写一个controller手动执行也可以,只需要启动一次即可。

相关文章

网友评论

      本文标题:rabbitmq限流模型实例

      本文链接:https://www.haomeiwen.com/subject/lyhryqtx.html