1.RabbitAutoConfiguration类解析

1.1类注解解析
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
@ConditionalOnClass
判断类路径中存在RabbitTemplate
类和Channel
类
@EnableConfigurationProperties
导入rabbitmq的配置类,具体就是在application.properties中的配置
@Import
导入RabbitAnnotationDrivenConfiguration
类,见下文2.0的分析
1.2构造rabbitmq的连接工厂类

类图如下:

这里比较简单,就是判断bean不存在,就注册为bean到spring容器中
1.3 构造rabbitmqTemplate
@Configuration
//`@Import`导入`RabbitConnectionFactoryCreator`类,也就是1.2中注入的bean
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
……
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
//设置连接工厂
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//设置消息转换器
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
//设置重试策略
template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
return template;
}
1.3.1 注入连接工厂
1.3.2 注入消息转换器
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
会判断当前环境中是否存在自定义消息转换器的实现
自定义实现例子为:
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware {
……
// 默认的消息转换器
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
……
1.3.3 重试策略的设置
- RabbitAnnotationDrivenConfiguration类
@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
……
//注册默认的容器工厂类
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
……
@Configuration
//进行bean的导入
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
@Configuration
public class RabbitBootstrapConfiguration {
//注册bean名字为:org.springframework.amqp.rabbit.config.internalRabbitListenerAnnotationProcessor
//这个类就极为关键,真正注册@RabbitListener的逻辑就在这里
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
//注册bean名字为:org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
- RabbitListenerAnnotationBeanPostProcessor类
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton {
……
//实现接口的执行顺序是:
setBeanClassLoader ->
setBeanFactory ->
setEnvironment ->
postProcessAfterInitialization(每一个spring容器中的bean都会执行这个方法) ->
afterSingletonsInstantiated





@Override
public void afterSingletonsInstantiated() {
// private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, RabbitListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
for (RabbitListenerConfigurer configurer : instances.values()) {
configurer.configureRabbitListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.containerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
//关键方法,
this.registrar.afterPropertiesSet();
// clear the cache - prototype beans will be re-cached.
this.typeCache.clear();
}
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
//遍历每一个@RabbitListener
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
//这里进行监听器容器的配置设定,以及初始化,有点不懂了,后续再来撸
MessageListenerContainer container = createListenerContainer(endpoint, factory);
……
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
//工厂类创建MessageListenerContainer
//见SimpleRabbitListenerContainerFactory类的createContainerInstance方法,将SimpleRabbitListenerContainerFactory工厂类与SimpleRabbitListenerContainer实体类关联起来
//@Override
//protected SimpleMessageListenerContainer createContainerInstance() {
// return new SimpleMessageListenerContainer();
//}
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
……
工厂类与实体类的类图


网友评论