1、消费者核心配置
consumeFromWhere(某些情况失效:参考https://blog.csdn.net/a417930422/article/details/83585397)
CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
allocateMessageQueueStrategy:
负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
offsetStore:
消息消费进度存储器 offsetStore 有两个策略:
LocalFileOffsetStore 和 RemoteBrokerOffsetStor,广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore
consumeThreadMin
最小消费线程池数量
consumeThreadMax
最大消费线程池数量
pullBatchSize:
消费者去broker拉取消息时,一次拉取多少条。可选配置
consumeMessageBatchMaxSize:
单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
messageModel :
消费者消费模式, CLUSTERING——默认是集群模式CLUSTERING BROADCASTING——广播模式
2、集群和广播模式下消费端处理
如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
集群模式(默认):
Consumer实例平均分摊消费生产者发送的消息
例子:订单消息,一般是只被消费一次
广播模式:
广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
例子:群公告,每个人都需要消费这个消息
3、PushConsumer、PullConsumer消费模式
Push
实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题
Pull
消费者从Server端拉取消息,主动权在消费者端,可控性好;但间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理
长轮询:
Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer.没消息的话,超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息,也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控,否则会一堆连接
PushConsumer本质是长轮训
系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,
在broker端可以通过longPollingEnable=true来开启长轮询
消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
服务端代码:broker.longpolling
虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性
PullConsumer需要自己维护Offset(参考官方例子)
官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
获取MessageQueue遍历
客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等
处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态
灵活性高可控性强,但是编码复杂度会高
优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候
网友评论