美文网首页
RabbitMQ拉模式批量消费消息

RabbitMQ拉模式批量消费消息

作者: Java大生 | 来源:发表于2018-12-05 16:50 被阅读0次

    实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。

    实现推模式推荐的方式是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。

    推模式是最常用的,但是有些情况下推模式并不适用的,比如说:

    由于某些限制,消费者在某个条件成立时才能消费消息

    需要批量拉取消息进行处理

    实现拉模式

    RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。

    /**

    * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}

    * @see com.rabbitmq.client.AMQP.Basic.Get

    * @see com.rabbitmq.client.AMQP.Basic.GetOk

    * @see com.rabbitmq.client.AMQP.Basic.GetEmpty

    * @param queue the name of the queue

    * @param autoAck true if the server should consider messages

    * acknowledged once delivered; false if the server should expect

    * explicit acknowledgements

    * @return a {@link GetResponse} containing the retrieved message data

    * @throws java.io.IOException if an error is encountered

    */

    GetResponse basicGet(String queue, boolean autoAck) throws IOException;

    basicGet 返回 GetResponse 类。

    public class GetResponse {

        private final Envelope envelope;

        private final BasicProperties props;

        private final byte[] body;

        private final int messageCount;

        // ...

    rabbitmq-client版本4.0.3

    使用 basicGet 拉取消息需要注意:

    basicGet

    DefaultConsumer

    示例代码:

    private void consume(Channel channel) throws IOException, InterruptedException {

        while (true) {

            if (!isConditionSatisfied()) {

                TimeUnit.MILLISECONDS.sleep(1);

                continue;

            }

            GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);

            if (response == null) {

                TimeUnit.MILLISECONDS.sleep(1);

                continue;

            }

            String data = new String(response.getBody());

            logger.info("Get message <= {}", data);

            channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

        }

    }

    批量拉取消息

    RabbitMQ支持客户端批量拉取消息,客户端可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性ACK。需要注意:

    basicGet

    basicAck

    示例代码:

    String bridgeQueueName = extractorProperties.getBridgeQueueName();

    int batchSize = extractorProperties.getBatchSize();

    List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);

    long tag = 0;

    while (responseList.size() < batchSize) {

        GetResponse getResponse = channel.basicGet(bridgeQueueName, false);

        if (getResponse == null) {

            break;

        }

        responseList.add(getResponse);

        tag = getResponse.getEnvelope().getDeliveryTag();

    }

    if (responseList.isEmpty()) {

        TimeUnit.MILLISECONDS.sleep(1);

    } else {

        logger.info("Get <{}> responses this batch", responseList.size());

        // handle messages

        channel.basicAck(tag, true);

    }

    关于QueueingConsumer

    QueueingConsumer 在客户端本地使用 BlockingQueue 缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是 BlockingQueue.take ),但是 QueueingConsumer 现在已经标记为Deprecated。

    欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

    群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    相关文章

      网友评论

          本文标题:RabbitMQ拉模式批量消费消息

          本文链接:https://www.haomeiwen.com/subject/pfjwcqtx.html