美文网首页
Springboot ActiveMQ Consumer配置

Springboot ActiveMQ Consumer配置

作者: 曹振华 | 来源:发表于2017-04-25 20:54 被阅读436次
/**
 * 配置consumer Listener   
 **/
@Configuration
public class JmsListenerContainerConfiguration
{
@Value("${spring.jms.listener.concurrency}")
private int poolSize;

@Value("${spring.jms.listener.max-concurrency}")
private int maxPoolSize;

@Value("${spring.jms.pub-sub-domain}")
private boolean isPubSubDomain;

private final CachingConnectionFactory cachingConnectionFactory;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;
private final JmsProperties jmsProperties;

@Autowired
JmsListenerContainerConfiguration(ObjectProvider<DestinationResolver> destinationResolver,
                                  ObjectProvider<MessageConverter> messageConverter,
                                  JmsProperties jmsProperties,
                                  CachingConnectionFactory cachingConnectionFactory) {
    this.destinationResolver = destinationResolver;
    this.messageConverter = messageConverter;
    this.jmsProperties = jmsProperties;
    this.cachingConnectionFactory = cachingConnectionFactory;
}

@Bean(name = { "jmsListenerContainerFactory4Topic" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic() {
    DefaultJmsListenerContainerFactory factoryTopic = new DefaultJmsListenerContainerFactory();
    factoryTopic.setConnectionFactory(cachingConnectionFactory);

    JmsProperties.Listener listener = jmsProperties.getListener();
    factoryTopic.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
        factoryTopic.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null)
        factoryTopic.setConcurrency(concurrency);
    factoryTopic.setPubSubDomain(jmsProperties.isPubSubDomain());
    factoryTopic.setCacheLevel(CACHE_CONSUMER);
    factoryTopic.setReceiveTimeout(5000L);
    factoryTopic.setTaskExecutor(threadPoolExecutor());
    return factoryTopic;
}

@Bean(name = { "jmsListenerContainerFactory4Queue" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue() {
    DefaultJmsListenerContainerFactory factoryQueue = new DefaultJmsListenerContainerFactory();
    factoryQueue.setConnectionFactory(cachingConnectionFactory);

    JmsProperties.Listener listener = jmsProperties.getListener();
    factoryQueue.setAutoStartup(listener.isAutoStartup());
    if (listener.getAcknowledgeMode() != null) {
      //ack 的方式 
      factoryQueue.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
    }
    String concurrency = listener.formatConcurrency();
    if (concurrency != null)
        factoryQueue.setConcurrency(concurrency);
       //是否是订阅方式  
    factoryQueue.setPubSubDomain(jmsProperties.isPubSubDomain());
    factoryQueue.setCacheLevel(CACHE_CONSUMER);
    factoryQueue.setReceiveTimeout(5000L);
    factoryQueue.setTaskExecutor(threadPoolExecutor());

    return factoryQueue;
}

/**
  * 用来设置consumer的taskExecutor。
  **/
@Bean
public ThreadPoolTaskExecutor threadPoolExecutor() {
    ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
    //amq消费线程数
    threadPoolExecutor.setCorePoolSize(poolSize);
    threadPoolExecutor.setMaxPoolSize(maxPoolSize);
    //队列容量
    threadPoolExecutor.setQueueCapacity(10000);
    threadPoolExecutor.setDaemon(true);
    threadPoolExecutor.setKeepAliveSeconds(120);
    threadPoolExecutor.setAllowCoreThreadTimeOut(true);
    //线程名称前缀
    threadPoolExecutor.setThreadNamePrefix("fhtMessageConsumer-");
    return threadPoolExecutor;
}

}

相关文章

网友评论

      本文标题:Springboot ActiveMQ Consumer配置

      本文链接:https://www.haomeiwen.com/subject/xvkbzttx.html