ActiveMQ的设计目标之一是成为高性能的消息总线。为了高效使用网络资源,broker使用push模型把消息分发给消费者。这样可以确保消费者的本地消息缓冲区中,总有待处理的消息。这就是activeMq的prefetch机制,相较于消费者显式的从broker中pull消息。预先将消息push到每个消费者的缓冲区可以降低消息的处理时延.
然而,如果不限制每次push给消费者的消息个数将会是危险的,过多的消息会导致消费者客户端资源耗尽。尤其是当消息的消费速度显著慢于消息的分发速度时。而且如果不加以限制的话会导致消息被单一的消费者消费,其他消费者处于空闲状态,我们就遇到过这种情况,activemq默认的 缓冲区大小1000条message, 有时其中一个消费者缓冲区中有800多条message,而另一个消费者处于空闲状态,这就导致资源没有充分利用,而且降低了消息的处理速度。 为了避免这种情况,ActiveMQ使用预取极限(prefetch limit)来限制一次性分发给单个消费者的最大消息个数。消费者则使用预取极限(prefetch limit)来设置其消息缓冲区的大小。
可以在ActiveMQConnectionFactory或ActiveMQConnection上指定ActiveMQPrefetchPolicy的实例,具体有四个配置:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE - 1)
有两种方法可以配置prefetch limit:
- 配置到connection url的属性中,例如:
//指定所有类型的prefetch limit 为50
tcp://localhost:61616?jms.prefetchPolicy.all=50
//修改queue prefetch limit
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
- 针对每个consumer进行配置
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
前面也提到过,prefetch limit也不是越大越好,能够很快处理的Message,相应的consumer我们可以将prefetch limit设置的较大一点,而处理比较慢的Message,相应的consumer最好将prefetch limit设置为1,这可确保消费者一次只处理一条消息,Message不会pending到缓冲区,并且能够充分利用多个Message Consumer的资源。如果将prefetch limit设置为0,将导致consumer一次一个地轮询pull消息,而不是将消息push到消费者缓冲区,前面提到过这种方式会增加时延,间接的降低了消息的处理速度。
网友评论