1 Message Cursors(消息指针)
在使用非持久消息传递时,ActiveMQ的早期版本中的一个常见问题是RAM缓存区不足。
ActiveMQ 5.0之前的版本在内存中保留了对所有可能分发给活动的“持久主题使用者”或“队列”的消息的引用。尽管引用本身并不大,但确实对可以挂起的最大消息数量施加了限制。
消息系统分发持久消息的一种典型方法是,当客户端准备使用它们时,使用游标保持下一个分配位置,从长期存储中批量提取它们。这是一种健壮且可扩展性强的方法,但是对于消费者(一个或多个)可以跟上消息的产生者(一个或多个)的情况,并不是最有效的方法。
ActiveMQ 5.0采用了一种混合方法,允许消息直接从生产者传递到使用者(在消息保留之后),但是如果消费者慢下来,则切换回使用游标。
当消息的消费者处于活跃状态并且处理能力较强时,被持久化存储的消息直接发送到与消费者相关联的发送队列,如下:
当消息已经出现积压,消费者再开始活跃,或者是消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,并发送与消费者关联的发送队列。见下图:
1.1 游标类型
ActiveMQ 5.0中的默认消息光标类型是基于存储的。它的行为如上所述。可以使用两种其他类型的游标:VM游标和基于文件的游标。同时支持非持久消息的处理,Store-based内嵌了File-based的模式,非持久消息直接被Non-persistent pending Cursor所处理。工作模式见下图:
1.2 VM游标
VM Cursor是ActiveMQ 4.x的工作方式:对消息的引用保存在内存中,并在需要时传递给调度队列。这可能非常快,但也有一个缺点,就是无法处理非常慢的消费者或长时间不活动的消费者。
1.3 基于文件的游标
基于文件的游标源自VM游标。当代理中的内存达到其限制时,它可以将消息分页到磁盘上的临时文件:
1.4 配置使用
默认情况下,使用基于存储的游标,但是可以根据目标配置不同的游标。
- Topic subscribers
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="Test.DLQ." />
</deadLetterStrategy>
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
<pendingDurableSubscriberPolicy>
<vmDurableCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
说明:
有效的Subscriber类型是vmCursor和fileCursor,缺省是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursor,vmDurableCursor和fileDurableSubscriberCursor,缺省是store based cursor。
- Queue
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>" >
<deadLetterStrategy>
<individualDealLetterStrategy topicPrefix="Test.DLQ"/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
2 Async Send(异步发送)
ActiveMQ支持异步和同步发送消息,是可以配置,通常对于快的消费者,是直接把消息同步发送过去,但是对于一个慢消费者,你使用同步发送消息可能出现producer堵塞等现象,慢消费者适合异步发送。
配置使用:
-
ActiveMQ默认设置dispatchAsync=true是最好的性能设置。如果你处理Fast Consumer则使用dispatchAsync=false
-
在Connection URI级别来配置使用Async Send
factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
- 在ConnectionFactory级别来配置使用Async Send
factory.setUseAsyncSend(true);(此方法是存在于ActiveMQConnectionFactory中的)
- Connection级别来配置使用Async Send
connection.setUseAsyncSend(true); (此方法存在于ActiveMQConnection中)
3 Optimized Acknowledgement(消息确认机制)
ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:
- 在Connection URI 上禁止启用Optimized Acknowledgements。
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
- 在ConnectionFactory 上禁止启用Optimized Acknowledgements。
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
- 在Connection上禁止启用Optimized Acknowledgements。
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
4 Dispatch Policies(分发策略)
ActiveMQ的prefetch缺省参数,是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大,而且缺省的dispatche policies会尝试尽可能快的填满缓冲。
然而有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个边的consumer上,这样就会因为负载的不均衡而导致处理时间的增加。
对于队列,您可以定义是否以round-robin
循环方式(默认行为)进行调度,还是在调度过程选择下一个消费者(strictOrderDispatch)之前是否耗尽一个消费者的预取缓冲区。
4.1 Queue配置
queue配置成 strictOrderDispatch 模式。
<policyEntry queue=">" strictOrderDispatch="false" />
注意:
从5.14.0版开始-当有一个使用者时,strictOrderDispatch = true(round-robin) 选项将确保重新分发的消息的严格顺序。
4.2 Topic配置
Round Robin dispatch policy会尝试平均分发消息:
<policyEntry topic="FOO.>">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
5 Producer Flow Control(生产者流量控制)
流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞知道资源可以继续使用,或者抛出一个JMSException,可以通过来配置。 值得注意的是,默认<systemUsage>
设置会在达到或限制时导致生产者阻塞:
-
同步发送消息的producer会自动使用producer flow control。
-
对于异步发送消息的producer,要使用producer flow control,先要为connection配置一个
ProducerWindowSize
参数,如下:
ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
ProducerWindowSize
是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前,能够发送消息的最大字节数。
5.1 禁用 producer flow control
可以通过producerFlowControl在Broker配置中将适当的目标策略上的标志设置为false来对代理上的特定JMS队列和主题禁用流控制。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>
注意:
自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。
如果想把所有的非持久化消息存放到内存中,并在达到内存限制的时候停掉生产者,你需要配置,示例如下:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
上面的片段将确保所有非持久队列消息都保留在内存中,每个队列的限制为1Mb。
5.2 配置客户端异常
为了应对broker
空间不足,而导致的不确定的阻塞send()
方法的一种替代方案,就是将其配置成客户端抛出的一个异常,通过将sendFailIfNoSpace
属性设置为true
,代理将会引起send()
方法失败,并抛出javax.jms.ResourceAllocationException
异常,传播到客户端,配置示例:
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
此属性的优点是客户端可以捕获javax.jms.ResourceAllocationException
,稍等片刻然后重试该send()
操作,而不仅仅是无限期地挂起。
从版本5.3.1开始,sendFailIfNoSpaceAfterTimeout
已添加该属性。此属性会导致send()
操作失败,并在客户端发生异常,但仅在等待给定的时间后才会失败。如果在配置的时间后仍未释放代理上的空间,则send()
客户端操作才会失败。下面是一个示例:
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
超时以毫秒为单位定义,因此以上示例在send()
操作失败之前要等待三秒钟。此属性的优点是,它将在配置的时间量内阻塞,而不是立即失败或无限期阻塞。此属性不仅在代理方面提供了改进,而且还为客户端提供了改进,因此它可以捕获异常,稍等片刻然后重试该send()
操作。
网友评论