/**
* 配置consumer Listener
**/
@Configuration
public class JmsListenerContainerConfiguration
{
@Value("${spring.jms.listener.concurrency}")
private int poolSize;
@Value("${spring.jms.listener.max-concurrency}")
private int maxPoolSize;
@Value("${spring.jms.pub-sub-domain}")
private boolean isPubSubDomain;
private final CachingConnectionFactory cachingConnectionFactory;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;
private final JmsProperties jmsProperties;
@Autowired
JmsListenerContainerConfiguration(ObjectProvider<DestinationResolver> destinationResolver,
ObjectProvider<MessageConverter> messageConverter,
JmsProperties jmsProperties,
CachingConnectionFactory cachingConnectionFactory) {
this.destinationResolver = destinationResolver;
this.messageConverter = messageConverter;
this.jmsProperties = jmsProperties;
this.cachingConnectionFactory = cachingConnectionFactory;
}
@Bean(name = { "jmsListenerContainerFactory4Topic" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic() {
DefaultJmsListenerContainerFactory factoryTopic = new DefaultJmsListenerContainerFactory();
factoryTopic.setConnectionFactory(cachingConnectionFactory);
JmsProperties.Listener listener = jmsProperties.getListener();
factoryTopic.setAutoStartup(listener.isAutoStartup());
if (listener.getAcknowledgeMode() != null) {
factoryTopic.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
}
String concurrency = listener.formatConcurrency();
if (concurrency != null)
factoryTopic.setConcurrency(concurrency);
factoryTopic.setPubSubDomain(jmsProperties.isPubSubDomain());
factoryTopic.setCacheLevel(CACHE_CONSUMER);
factoryTopic.setReceiveTimeout(5000L);
factoryTopic.setTaskExecutor(threadPoolExecutor());
return factoryTopic;
}
@Bean(name = { "jmsListenerContainerFactory4Queue" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue() {
DefaultJmsListenerContainerFactory factoryQueue = new DefaultJmsListenerContainerFactory();
factoryQueue.setConnectionFactory(cachingConnectionFactory);
JmsProperties.Listener listener = jmsProperties.getListener();
factoryQueue.setAutoStartup(listener.isAutoStartup());
if (listener.getAcknowledgeMode() != null) {
//ack 的方式
factoryQueue.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
}
String concurrency = listener.formatConcurrency();
if (concurrency != null)
factoryQueue.setConcurrency(concurrency);
//是否是订阅方式
factoryQueue.setPubSubDomain(jmsProperties.isPubSubDomain());
factoryQueue.setCacheLevel(CACHE_CONSUMER);
factoryQueue.setReceiveTimeout(5000L);
factoryQueue.setTaskExecutor(threadPoolExecutor());
return factoryQueue;
}
/**
* 用来设置consumer的taskExecutor。
**/
@Bean
public ThreadPoolTaskExecutor threadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
//amq消费线程数
threadPoolExecutor.setCorePoolSize(poolSize);
threadPoolExecutor.setMaxPoolSize(maxPoolSize);
//队列容量
threadPoolExecutor.setQueueCapacity(10000);
threadPoolExecutor.setDaemon(true);
threadPoolExecutor.setKeepAliveSeconds(120);
threadPoolExecutor.setAllowCoreThreadTimeOut(true);
//线程名称前缀
threadPoolExecutor.setThreadNamePrefix("fhtMessageConsumer-");
return threadPoolExecutor;
}
}
网友评论