RabbitMQ的整体概括
RabbitMQ是对于AMQP(高级消息队列协议)的具体实现,是一个用于在分布式系统中存储转发消息的网络通信协议。
应用解耦此模型表示,消息中间件broker是消息的容器,它从生产者接收消息,并根据路由规则把消息投递给消费者。需要注意的是,生产者不是直接将消息投递给broker,而是通过路由键(RoutingKey)投递给交换机(Exchange),交换机绑定具体的队列(Queue),而消费者消费的消息只与队列有关,消费将会有推拉两种模式。
图示如下:
AMQP协议模型AMQP 协议层角色相关的概念:
- 生产者(producer):产生消息的应用,能够传递消息到消息中间件的应用。
- 消息中间件(brokers):消息传递的中间载体,即我们今天的主角 RabbitMQ。
- 消费者(consumers):接收并处理消息的应用。从消息中间件获取消息并处理。
- 连接(Connection):生产者或消费者和消息中间件之间需要建立起连接。AMQP 应用层协议使用的是能够提供可靠投递的 TCP 连接,AMQP 的连接通常是长连接,AMQP 使用认证机制并且提供 TLS(SSL)保护。当我们的生产者 或 消费者 不再需要连接到消息中间件的的时候,需要优雅的释放掉它们与消息中间件 TCP 连接,而不是直接将 TCP 连接关闭。
- 信道(channel):通常情况下生产者 或 消费者 需要与 消息中间件之间建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP 协议提供了信道(channel)这个概念来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个信道准备的。
消息中间件相关的概念:
- 虚拟主机(vHosts):虚拟主机概念,一个 Virtual Host 里面可以有若干个 Exchange 和 Queue,我们可以控制用户在 Virtual Host 的权限。
- 用户(User):最直接了当的认证方式,谁可以使用当前的消息中间件。
- 交换机(Exchange):交换机接收生产者发出的消息并且路由到由交换机类型和被称作绑定(bindings)的规则所决定的到队列中,交换机不存储消息。
- 消息(message):生产者产生的和消费者处理的消息。
- 路由键(routing key):路由关键字,交换机 exchange 的路由规则利用这个关键字进行消息投递到消息队列。(路由键长度不能超过 255 个字节)
- 绑定(Binding):Binding 可以理解为交换机 Exchange 路由消息到消息队列的路由规则关系(即消息队列和交换机的绑定)。当交换机 Exchange 收到生产者传递的消息 Message 时会解析其 Routing Key,Exchange 根据 Routing Key 与交换机类型 Exchange Type 将 Message 路由到消息队列中去。
SpringBoot整合RabbitMQ
简单了解了rabbitmq的概念,我们来实践一下与springboot的整合。
首先引入jar包。
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.7.9.RELEASE</version>
</dependency>
配置连接工厂
/**
* 创建连接工厂
* @return
*/
@Bean(name = "adapterConnectionFactory")
@Order(value = 2)
public ConnectionFactory adapterConnectionFactory() {
//创建连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//设置集群方式
connectionFactory.setAddresses(rabbitMQProperties.getAddress());
//connectionFactory.setHost(rabbitMQProperties.getHost());设置单节点方式
//设置端口
connectionFactory.setPort(rabbitMQProperties.getPort());
//设置用户名
connectionFactory.setUsername(rabbitMQProperties.getUsername());
//设置密码
connectionFactory.setPassword(rabbitMQProperties.getPassword());
//设置虚拟主机
connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
//消息确认机制confirm-callback或return-callback,成功后confirm,失败后回调
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
消息发送的组件RabbitTemplate
/**
* 创建消息发送组件
* @return
*/
@Bean(name = "adapterRabbitTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(adapterConnectionFactory());
//exchange根据路由键匹配不到对应的queue时将会调用basic.return将消息返还给生产者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//消息成功发送到broker
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
ApiLog.info("mq message send (ACK)status =", ack);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//消息发送失败
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
监听器容器工厂SimpleRabbitListenerContainerFactory
/**
* 消费者监听
*
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(adapterConnectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//单台并发消费者数量
factory.setConcurrentConsumers(10);
//单台并发消费的最大消费者数量
factory.setMaxConcurrentConsumers(30);
//预取消费数量,unacked数量超过这个值broker将不会接收消息
factory.setPrefetchCount(5);
//有事务时处理的消息数
factory.setTxSize(1);
//消息确认机制
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//构建retryConfig,用于在JavaConfig的模式下读取并发参数
RabbitProperties.AmqpContainer config = rabbitMQProperties.getListener().getSimple();
RabbitProperties.ListenerRetry retryConfig = config.getRetry();
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
//最大重试次数,消费者异常之后
builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
return factory;
}
重试参数说明,其他参数略
spring.rabbitmq.listener.simple.retry.enabled=true
//消费者异常之后的最大重试次数,JavaConfig方式需显示构建retryConfig
spring.rabbitmq.listener.simple.retry.max-attempts=4
spring.rabbitmq.listener.simple.retry.initial-interval=2000
spring.rabbitmq.listener.simple.default-requeue-rejected=true
最佳实践
初始化
@Autowired
private ConnectionFactory connectionFactory;
@Value("${mq.queue.callback_queue}")
private String callbackQueueKey;
@Value("${mq.exchange}")
private String exchange;
@PostConstruct
public void init(){
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
//声明exchange
Exchange topicExchange = new TopicExchange(exchange);
admin.declareExchange(topicExchange);
//声明queue
Queue callbackQueue = new Queue(callbackQueueKey, true);
admin.declareQueue(callbackQueue);
//Binding
admin.declareBinding(BindingBuilder.bind(callbackQueue)
.to(topicExchange)
.with(callbackQueueKey).noargs());
}
生产者
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(callbackQueueKey);
rabbitTemplate.convertAndSend(callBackRequest, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = message.getMessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Obj.class);
return message;
}
});
消费者
@RabbitListener(queues = "queueName", containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload Object obj, Channel channel, Message message) {
doSometing...
}
需要注意的是,消费者的异常没有捕获或抛出,或者catch块里出现异常,在消息确认机制是AUTO的前提下将会无限重试进入死循环,这个时候可以设置最大重试次数或手动进行ack来处理。
如果需要手动ack,需要实现ChannelAwareMessageListener
@Override
public void onMessage(Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
以上只是项目使用中的简单阐述,除此还有exchange的4种模式,死信队列,消息堆积,可靠性投递机制等待补充。
网友评论