美文网首页
RocketMQ的读取

RocketMQ的读取

作者: 黄靠谱 | 来源:发表于2019-02-08 14:32 被阅读27次

参考

有思考的大佬博客
https://www.jianshu.com/p/f071d5069059

概述

PushConsumer的非顺序消息执行流程:

  • Consumer每20s重新做一次负载均衡更新,根据从Broker存储的ConsumerGroup和Topic信息,把MessageQueue分发给不同的Consumer,负载策略默认是分页
  • 每个MessageQueue对应一个pullRequest,全部存储到该Consumer的pullRequestQueue队列里面
  • Consumer启动独立后台PullMessageService线程,不停的尝试从pullRequestQueue.take()获取PullRequest
  • 捞取到PullRequest会先做缓存校验(默认一个Queue里面缓存待处理消息个数不超过1000个,消息大小不超过100M,否则会延迟50ms再重试),从而保证客户端的缓存负载不会过高
  • PullRequest发送给Broker,如果Broker发现该Queue有待处理的消息,就会直接返回给Consumer,Consumer接收响应以后,重新把该PullRequest丢到自己的pullRequestQueue队列里面,从而重复执行捞取消息的动作,保证消息的及时性
  • PullRequest发送给Broker,如果Broker发现该Queue没有待处理的消息,则会Hold住这个请求,暂不响应给Consumer,默认长轮询是5s重试获取一次待处理消息,如果有新的待处理消息则立刻Response给Consumer,当客户端检测到消息挂起超时(客户端有默认参数 响应超时时间 20s),会重新发起PullRequest给Broker
  1. 负载均衡放在Consumer端处理,而不是由Broker处理

  2. PushConsumer采用:长轮询+超时时间+Pull的模式

  • 可以保证Consumer的负载不会过高,因为Broker是不会主动去给Consumer推送消息,只有Consumer校验自己的缓存消息没有超过阈值才会去从Broker拉取消息
  • 可以保证消息的即时性,因为如果Broker持续有新的消息产生,Consumer不停的Pull,Broker不停的Response,重复执行
  • Broker端无效请求的次数大大降低:Broker如果当前没有待处理消息,会挂起PullRequest,而Consumer在未接收到Response且未超时时,是不会重新发起PullRequest的
  1. 长轮询的原理:
    https://www.jianshu.com/p/ac4ff1a8133b
  2. 消费者会不停的从PullRequest的队列里取request然后向broker请求消息,得到broker的响应后会做相应处理并把PullRequest放回队列以便下一次请求
  3. broker在查不到消息的情况下会hold住请求,在ReputMessageService不停构建ConsumeQueue的时候,会拿出hold住的请求进行二次处理

PullConsumer模式

特点:代码更繁琐,但是业务更自由,由代码控制pull的规则(间隔多长时间pull一次)。
但是一般RMQ的Push模式已经优化的很好了,所以PullConsumer模式在实际应用中用的较少。

    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setInstanceName("consumer");
    consumer.start();
    Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
    for (MessageQueue mq : mqs) {
        SINGLE_MQ:
        while (true) {
                PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        System.out.println(pullResult.getMsgFoundList().get(0).toString());
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    default:
                        break;
                }
        }
    }

PushConusmer模式:

结合了Push和Pull的特点,Rocketmq已经优化好了Pull的规则,用户只要编写回调接口即可。

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();

相关文章

网友评论

      本文标题:RocketMQ的读取

      本文链接:https://www.haomeiwen.com/subject/xotosqtx.html