注解式:
在【RabbitMQ-10】@RabbitListener注解生效的源码分析中,bean在初始化的时候,解析@RabbitListener
注解,根据注解配置和SimpleRabbitListenerContainerFactory
创建SimpleMessageListenerContainer
对象。
注解式,注解配置的信息优先级高于配置文件的配置,当然也可以在代码中创建SimpleRabbitListenerContainerFactory
类并放入Spring容器中。
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
若使用自己在代码中配置的监听工厂,那么@RabbitListener
中要进行声明:
@Component
@Slf4j
public class CustomerRev {
//声明使用的的监听器工厂(不声明使用默认的工厂)
@RabbitListener(containerFactory = "singleListenerContainer", queues = {"kinson2"})
public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("队列消费消息{}"+message);
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
}
配置式:
也可以创建SimpleMessageListenerContainer
类并放入到Spring容器中。在创建SimpleMessageListenerContainer
时,会配置监听的队列集合和监听方法。
@Slf4j
@Configuration
public class RabbitConfig {
//这种配置也可以去监听队列的消息
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new MySimpleMessageListenerContainer(connectionFactory);
//同时监听多个队列
container.setQueues(new Queue("kinson2"));
//设置当前的消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(2);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置监听外露
container.setExposeListenerChannel(true);
//设置消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody(), "utf-8");
log.info("队列2—消费消息:" + msg);
}
});
return container;
}
}
网友评论