美文网首页
activemq prefetchSize源码分析

activemq prefetchSize源码分析

作者: 超人也害羞 | 来源:发表于2019-07-09 22:59 被阅读0次

一、point

  1. usage of prefetchSize
  2. flow chart && source code

二、usage

  1. 创建ActiveMQConnection的时候指定。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(10001);
factory.setPrefetchPolicy(prefetchPolicy);
  1. 创建Queue的时候指定prefetchSize。
Queue queue = session.createQueue("studyQueue?consumer.prefetchSize=100");

三、flow chart && source code

主要是创建consumer时向broker注册一下consumer,然后传递prefetchSize给broker,作为broker的pending队列的大小。

public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
            String name, String selector, int prefetch,
            int maximumPendingMessageCount, boolean noLocal, boolean browser,
            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
        // ...... 部分省略
        // 设置从ActiceMQConnection中读取的prefetchSize
        this.info.setPrefetchSize(prefetch);
        // ...... 部分省略
        // Allows the options on the destination to configure the consumerInfo
        // 设置定义在Queue中的属性,包括prefetchSize
        if (dest.getOptions() != null) {
            Map<String, Object> options = IntrospectionSupport.extractProperties(
                new HashMap<String, Object>(dest.getOptions()), "consumer.");
            IntrospectionSupport.setProperties(this.info, options);
            if (options.size() > 0) {
                String msg = "There are " + options.size()
                    + " consumer options that couldn't be set on the consumer."
                    + " Check the options are spelled correctly."
                    + " Unknown parameters=[" + options + "]."
                    + " This consumer cannot be started.";
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }
        // ...... 部分省略
        try {
            this.session.addConsumer(this);
            this.session.syncSendPacket(info);
        } catch (JMSException e) {
            this.session.removeConsumer(this);
            throw e;
        }

        if (session.connection.isStarted()) {
            start();
        }
    }
ActiveMQ prefetchSize原理.jpg

相关文章

网友评论

      本文标题:activemq prefetchSize源码分析

      本文链接:https://www.haomeiwen.com/subject/lxhqkctx.html