场景
在使用RabbitMQ时,默认预取消息队列中的全部数据缓存在本地,导致只有一个进程能获取资源,而其他进程处于闲置状态。而持有资源的进程消费能力有限时,会导致消息队列积压。
说明
消息的轮询分配机制和尽可能快速推送消息的机制给实际使用带来困难。实际情况下,每个消费者处理消息的能力、每个消息处理所需时间可能都是不同的,若只是机械化地顺次分配,可能造成一个消费者由于处理的消息的业务复杂、处理能力低而积压消息,另一个消费者早早处理完所有的消息,处于空闲状态,造成系统的处理能力的浪费。且无法加入新的消费者以提高系统的处理能力。
希望达到的效果是:每个消费者都根据自身处理能力合理分配消息处理任务,既无挤压也无空闲,新加入的消费者也能分担消息处理任务,使系统的处理能力能够平行扩展。
RabbitMQ
客户端可通过Channel
类的basicQos(int prefetchCount)
设置消费者的预取数目,即消费者最大的未确认消息的数目。
假设prefetchCount=10
,有两个消费者,两个消费者依次从队列中抓取10条消息缓存本地,若此时有新的消息到达队列,先判断信道中未确认的消息是否大于或等于20条,若是,则不向信道中投递消息,当信道中未确认消息数小于20条后,信道中哪个消费者未确认消息小于10条,就将消息投递给哪个消费者。
channel.basicQos()
中设置的预取数量多少合适,是一个颇有讲究的问题。我们希望充分利用消费者的处理能力,因此不宜设置过小,否则在消费者处理消息后,RabbitMQ
收到确认消息后才会投递新的消息,导致此期间消费者处于空闲状态,浪费消费者的处理能力;但设置过大,又可能使消息积压在消费者的缓存里,我们希望对于来不及处理的消息,应保留在队列中,便于加入新的消费者或空闲出来的消费者分担消息处理任务。
RabbitMQ
官网的一篇文章详细讨论了预取数量的设置问题:
https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/
文章大致内容如下:
假设从RabbitMQ
服务端队列取消息、传输消息到消费者耗时为50ms,消费者消费消息耗时4ms,消费者传输确认消息到服务端耗时为50ms。若网络状况、消费者处理速度稳定,则预取数量的最优数值为:(50 + 4 + 50)/4=26个。
最初服务端将向客户端发送26
条消息,并缓存在客户端本地,当消费者处理好第一个消息后,向服务端发送确认消息并取本地缓存的第二个消息,确认消息由客户端传送到服务端耗时50ms
,服务端收到确认后发送新的消息经过50ms
又到达了客户端,而余下的25
个消息被消费耗时为25×4=100ms
,所以当新的消息达到时,第一轮的26
个消息恰好全部处理完。依次类推,之后,每当处理完一个旧有的消息时,恰好会到达一个新的消息。既不会发生消息积压,消费者也不会空闲。
但实际情况是,网络的传输状况、消费者处理消息的速度都不会是恒定的,会时快时慢,造成消息积压或消费者空闲,这就要求预取数量要与网络和消费者的状况实时改变。
总结
说白了,预取数据就是为了控制消费者在获取/发送数据与业务逻辑之间能够更好的衔接,避免某个消息积压或过于空闲的情况出现。
网友评论