美文网首页
RabbitMQ 消费端限流

RabbitMQ 消费端限流

作者: qyfl | 来源:发表于2019-09-26 22:25 被阅读0次

    RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数据的消息(通过基于 comsume 或者 channel 设置 qos 的值)未被确认前,不消费新的消息。

    注意事项:

    • 消费者的 autoAck 要设置为 false。
    • 消费者要写确认消息的代码。

    消费者代码:

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 创建一个 ConnectionFactory,并配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            // 2. 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            // 3. 通过 connection 创建 Channel
            Channel channel = connection.createChannel();
    
            // 4. 声明一个队列
            String queueName = "test001";
            String exchangeName = "test_qos_exchange";
            String routingKey = "qos.#";
            channel.queueDeclare(queueName, true, false, false, null);
            channel.exchangeDeclare(exchangeName, "topic", true, true, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 5. 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            // 6. 设置 Channel
            channel.basicQos(0,1,false);
            channel.basicConsume(queueName, false, new myComsumer(channel));
        }
    }
    

    自定义消费者:

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class myComsumer extends DefaultConsumer {
    
        private Channel channel;
    
        public myComsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            super.handleDelivery(consumerTag, envelope, properties, body);
            System.out.println("消费端:" + new String(body));
    
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ 消费端限流

          本文链接:https://www.haomeiwen.com/subject/mivquctx.html