参考
有思考的大佬博客
https://www.jianshu.com/p/f071d5069059
概述
PushConsumer的非顺序消息执行流程:
- Consumer每20s重新做一次负载均衡更新,根据从Broker存储的ConsumerGroup和Topic信息,把MessageQueue分发给不同的Consumer,负载策略默认是分页
- 每个MessageQueue对应一个pullRequest,全部存储到该Consumer的pullRequestQueue队列里面
- Consumer启动独立后台PullMessageService线程,不停的尝试从pullRequestQueue.take()获取PullRequest
- 捞取到PullRequest会先做缓存校验(默认一个Queue里面缓存待处理消息个数不超过1000个,消息大小不超过100M,否则会延迟50ms再重试),从而保证客户端的缓存负载不会过高
- PullRequest发送给Broker,如果Broker发现该Queue有待处理的消息,就会直接返回给Consumer,Consumer接收响应以后,重新把该PullRequest丢到自己的pullRequestQueue队列里面,从而重复执行捞取消息的动作,保证消息的及时性
- PullRequest发送给Broker,如果Broker发现该Queue没有待处理的消息,则会Hold住这个请求,暂不响应给Consumer,默认长轮询是5s重试获取一次待处理消息,如果有新的待处理消息则立刻Response给Consumer,当客户端检测到消息挂起超时(客户端有默认参数 响应超时时间 20s),会重新发起PullRequest给Broker
-
负载均衡放在Consumer端处理,而不是由Broker处理
-
PushConsumer采用:长轮询+超时时间+Pull的模式
- 可以保证Consumer的负载不会过高,因为Broker是不会主动去给Consumer推送消息,只有Consumer校验自己的缓存消息没有超过阈值才会去从Broker拉取消息
- 可以保证消息的即时性,因为如果Broker持续有新的消息产生,Consumer不停的Pull,Broker不停的Response,重复执行
- Broker端无效请求的次数大大降低:Broker如果当前没有待处理消息,会挂起PullRequest,而Consumer在未接收到Response且未超时时,是不会重新发起PullRequest的
- 长轮询的原理:
https://www.jianshu.com/p/ac4ff1a8133b - 消费者会不停的从PullRequest的队列里取request然后向broker请求消息,得到broker的响应后会做相应处理并把PullRequest放回队列以便下一次请求
- broker在查不到消息的情况下会hold住请求,在ReputMessageService不停构建ConsumeQueue的时候,会拿出hold住的请求进行二次处理
PullConsumer模式
特点:代码更繁琐,但是业务更自由,由代码控制pull的规则(间隔多长时间pull一次)。
但是一般RMQ的Push模式已经优化的很好了,所以PullConsumer模式在实际应用中用的较少。
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
for (MessageQueue mq : mqs) {
SINGLE_MQ:
while (true) {
PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
System.out.println(pullResult.getMsgFoundList().get(0).toString());
break;
case NO_NEW_MSG:
break SINGLE_MQ;
default:
break;
}
}
}
PushConusmer模式:
结合了Push和Pull的特点,Rocketmq已经优化好了Pull的规则,用户只要编写回调接口即可。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
网友评论