一、point
- usage of prefetchSize
- flow chart && source code
二、usage
- 创建ActiveMQConnection的时候指定。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(10001);
factory.setPrefetchPolicy(prefetchPolicy);
- 创建Queue的时候指定prefetchSize。
Queue queue = session.createQueue("studyQueue?consumer.prefetchSize=100");
三、flow chart && source code
主要是创建consumer时向broker注册一下consumer,然后传递prefetchSize给broker,作为broker的pending队列的大小。
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
// ...... 部分省略
// 设置从ActiceMQConnection中读取的prefetchSize
this.info.setPrefetchSize(prefetch);
// ...... 部分省略
// Allows the options on the destination to configure the consumerInfo
// 设置定义在Queue中的属性,包括prefetchSize
if (dest.getOptions() != null) {
Map<String, Object> options = IntrospectionSupport.extractProperties(
new HashMap<String, Object>(dest.getOptions()), "consumer.");
IntrospectionSupport.setProperties(this.info, options);
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new ConfigurationException(msg);
}
}
// ...... 部分省略
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
} catch (JMSException e) {
this.session.removeConsumer(this);
throw e;
}
if (session.connection.isStarted()) {
start();
}
}

网友评论