消费端的限流?
场景:
假设,我们mq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现什么样的情况呢?
大量的消息瞬间被全部推送给了这个消费者,但是单个消费者是无法消费这么多消息的。会导致系统崩溃,线上故障发生。
Rabbit MQ提供了一种qos(服务直连保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者是channel设置Qos的值)未被确认前,不进行消费新的消息。这样就保证了大量消息瞬间被推送时候,不至于拖垮服务。
方法:
Void BasiceQos(unit prefetchSize,ushort prefetchCount, boole global);
说明:该方法是在消费端设置的。
参数说明:
prefetchSize:消息大小限制。如消息多少M。设置为0表示不做限制
prefetchCount:一次最多能够处理多少条消息。默认设置1
global:限流策略在什么上使用的。Channle级别还是consumer级别。true:channel级别的设置;false:consumer级别的设置。一般设置为false即可。
注意:
prefetchSize和global这两项,目前版本的rabbit mq没有实现,暂且不做研究。prefetchCount在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。要手工签收
代码演示:
在消费端设置限流:
我们可以看到basicQos有三种方式。
修改消费者代码如下:
因为设置了手动签收的,所以需要修改MyConsumer中的代码。如下图:
限流测试:
1:在myConsumer类中,将受到那个ack相关代码关闭掉,生产者发送五条消息后,看看消费者能够消费多少条数据:
生产端发送5条消息,但是消费者值接收到了一条消息。这是为什么呢?
因为我们启动了限流模式自动签收的,且我们把手动ack注释掉了,限流是1条消息,这个时候因为没有收到第一条消息ack反馈信息。所以其他四条消息不能被推送给消费者,也就不能够被消费了。
我们在页面管控台查看queues下可以看到:
在test_test_limit_queue队列中,有4条消息还是处于准备状态,有一条消息已经消费了。
2:放开被注释掉的手动ack后,我们重新发送消息查看:
启动消费者后:
我们可以看到,五条消息都被顺利的签收了。
总结:
相关代码已经推送到git上面了。欢迎关注凯哥公众号:凯哥Java(kaigejava)或访问凯哥个人博客:www.kaigejava.com
下节预告:
在下一节中,我们将要学习消费端ACK与重回队列
网友评论