美文网首页
RocketMQ消费者核心知识

RocketMQ消费者核心知识

作者: JBryan | 来源:发表于2020-04-26 15:37 被阅读0次

1、消费者核心配置

consumeFromWhere(某些情况失效:参考https://blog.csdn.net/a417930422/article/details/83585397)

CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费

allocateMessageQueueStrategy:
负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配

offsetStore:
消息消费进度存储器 offsetStore 有两个策略:
LocalFileOffsetStore 和 RemoteBrokerOffsetStor,广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore

consumeThreadMin
最小消费线程池数量

consumeThreadMax
最大消费线程池数量

pullBatchSize:
消费者去broker拉取消息时,一次拉取多少条。可选配置

consumeMessageBatchMaxSize:
单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置

messageModel :
消费者消费模式, CLUSTERING——默认是集群模式CLUSTERING BROADCASTING——广播模式

2、集群和广播模式下消费端处理

如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量

集群模式(默认):
Consumer实例平均分摊消费生产者发送的消息
例子:订单消息,一般是只被消费一次

广播模式:
广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
例子:群公告,每个人都需要消费这个消息

3、PushConsumer、PullConsumer消费模式

Push
实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题

Pull
消费者从Server端拉取消息,主动权在消费者端,可控性好;但间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理

长轮询:
Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer.没消息的话,超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息,也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控,否则会一堆连接

PushConsumer本质是长轮训
系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,
在broker端可以通过longPollingEnable=true来开启长轮询
消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
服务端代码:broker.longpolling
虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性

PullConsumer需要自己维护Offset(参考官方例子)
官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
获取MessageQueue遍历
客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等
处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态
灵活性高可控性强,但是编码复杂度会高
优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候

相关文章

网友评论

      本文标题:RocketMQ消费者核心知识

      本文链接:https://www.haomeiwen.com/subject/shzdwhtx.html