四种交换器
1 fanout
它会把所有发送到该交换器的消息路由到所有与该交换机绑定的队列中(Routing Key 会被忽略)
2 direct
direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中(全文匹配)
3 topic
可以使来自不同源头的消息能够到达同一个队列。可以使用通配符 (通配符匹配)
4 headers
header类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
Virtual hosts
每个Virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchange,和Binding rule等,这保证了你可以在多个不同的application中使用RabbitMQ
架构模型
AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。
应用场景架构图,Message进入Exchange,通过routing key把消息放到对应的queue里
RabbitMQ Server: 也叫broker server
Connection
就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。
Channels
虚拟连接。它建立在上述的TCP的连接上,数据流动都是在Channel中进行的,也就是说,一般情况下程序起始建立TCP连接,第二步就是建立这个Channel。
消息基于什么传输(为什么使用Channel,而不是直接使用TCP连接)
由于TCP连接的创建和销毁开销较大,且并发数收系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真是的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
消息什么情况下会丢失
1 .发送消息的交换机并没有绑定任何队列,消息将会丢失
2.交换机绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配
3.消息没有设置为手动确认,消息到达了消费者,消费时抛出了异常。
如何保证消息不丢失
1.消息持久化(前提是队列必须持久化)
queue(默认持久化)、message,设置了queue和message的持久化之后,当重启broker服务重启之后,消息依旧存在。只设置了队列持久化,重启只有消息会丢失,但是如果只设置了消息持久化,重启之后队列消失,继而消息也会丢失,所以只设置消息持久化而不设置队列持久化显得毫无意义。
exchange(默认持久化),不设置exchange的持久化不会影响对消息的可靠性有什么影响,但是如果exchange不设置持久化,那么当broker服务重启之后,exchange将不存在,那么发送方就无法正常发送消息。
Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息 (尚不清楚是否默认持久化,是不是Message持久化就是要求消息写入磁盘)
2.ACK确认机制(如何确保消息正确地发送至RabbitMQ,如何确保消息接收方消费了消息)
发送方确认模式:
/**
* rabbitmq发送消息工具类
**/
@Component
@Slf4j
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMqSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm->id:{}", correlationData.getId());
if (ack) {
log.info("确认发送成功通知,id:{}",correlationData.getId());
} else {
log.error("确认发送异常通知:id:{},cause:{}", correlationData.getId(), cause);
}
}
}
将信息设置成confirm(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者
消息写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)
// 注意: 配置文件中需要配置 spring.rabbitmq.publisher-confirms : true 表示开启生产者确认模式
这里也可以使用事务机制来确保消息一定会发送到RabbitMQ
1. 配置事务
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory){
return new RabbitTransactionManager(connectionFactory());
}
2. 初始化时,通过 rabbitTemplate.setChannelTransacted(true)开启事务
@PostConstruct
private void init() {
rabbitTemplate.setChannelTransacted(true);
}
3.在发送消息的方法上加上@Transactional 注解,这样在该方法中发生异常时,消息将不会发送
接收方确认机制
@Bean("topicContainer")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_1.getCode()); //监听的队列
container.setMessageListener(exampleListener());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动开启ack a处
container.setExposeListenerChannel(true);
// container.setConcurrentConsumers(5);
// 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则改consumer将block调,知道有消息ack
container.setPrefetchCount(5);
return container;
}
@Bean("topicListener")
public ChannelAwareMessageListener exampleListener() {
return new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
try {
byte[] body = message.getBody();
log.info("topic receive msg: {}", new String(body));
int i = 1 / 0; // TODO 如果开启手动确认(a处和b处),而此处抛出异常,则消息依然会保存在mq中
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息成功消费 b处
} catch (Exception e) {
log.error("e:{}", e.getMessage());
}
}
};
}
接收方消息确认机制:
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同的操作)。只有消息这确认了消 息,Rabbitmq才能安全地把
消息从队列中删除,这里并没有用到消息超时机制,Rabbitmq仅通过Consumer的连接中断来确认是否需要重新发送消息,也就是说。
只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息,保证数据的最终一致性
消息确认模式有:
AcknowledgeMode.NONE: 自动确认
AcknowledgeMode.AUTO: 根据情况确认
AcknowledgeMode.MANUAL: 手动确认
3.设置集群镜像模式
4.消息补偿机制
延迟队列
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
}
把延迟队列消息转发到Constants.IMMEDIATE_EXCHANGE交换机,在消费的时候只需要监听Constants.IMMEDIATE_EXCHANGE对应的队列即可
如何避免消息重复投递或重复消费
在消息生产时,MQ内部针对每一条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付Id,订单Id等)作为去重的依据,避免同一条消息被重复消费。
消息如何分发
通过路由可以实现多消费的功能
单生产者多消费者
要实现多个消费者消费同一个队列只需要启动两个消费者监听相同的队列即可
两个消费比率接近 1:1 (如何做到的)
@RabbitListener(queues = "QUEUE_2", containerFactory = "rabbitListenerContainerFactory")
public void queue2_1(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag, Channel channel) {
log.info("queue2_1: receive:{}", new String(message.getBody()));
try {
log.info("业务逻辑处理1。。。");
channel.basicAck(deliverTag, false);
} catch (Exception e) {
log.error("e:{}", e.getMessage());
}
}
@RabbitListener(queues = "QUEUE_2", containerFactory = "rabbitListenerContainerFactory")
public void queue2_2(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag, Channel channel) {
log.info("queue2_2: receive:{}", new String(message.getBody()));
try {
log.info("业务逻辑处理2。。。");
channel.basicAck(deliverTag, false);
} catch (Exception e) {
log.error("e:{}", e.getMessage());
}
}
消息怎么路由
消息提供方 -> 路由 -> 一至多个队列
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定
通过队列路由键,可以把队列绑定到交换器上。
消息达到交换器后,Rabbitmq会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)
MessageConvert
消息什么时候需要持久化
1.消息本身在publish的时候就要求消息写入磁盘;
2.内存紧张,需要将部分内存中的消息转移到磁盘
消息什么时候回刷到磁盘
1.写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)
2.有个固定的刷盘时间: 25ms, 也就是不管Buffer满不满,每隔25ms,Buffer里的数据和未刷新到磁盘的文件内容必定会刷到磁盘。
3. 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷新到磁盘: (只要进程的信箱里没有消息,则产生一个timeout消息,这个timeout消息会触发刷盘操作)
使用RabbitMQ的好处
服务间高度解耦
异步通信性能高
流量削峰
使用RabbitMQ的缺点
系统可用性降低
系统引入的外部依赖越多,越容易挂掉
系统复杂性提高
怎么保证消息没有重复消费,怎么处理消息丢失的情况,怎么保证消息的顺序性,等一系列问题都需要解决
一致性问题
A系统处理完了直接返回成功了,便认为这个请求就成功了;但问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,造成了数据不一致了
问题
1.@RabbitListener 如何设置手动ack
2. Message 是否默认设置了持久化(应该怎么设置Message的持久化)
问题解决
1.@RabbitListener 如何设置手动ack
@RabbitListener(queues = "QUEUE_2",containerFactory = "rabbitListenerContainerFactory")
/**
*
* @desc 设置消费者为手动确认 配合 @RabbitListener 使用
* @date 2019/11/7 11:29
*/
@Configuration
public class ConsumerConfiguration {
@Bean("rabbitListenerContainerFactory")
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory ();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动开启ack
container.setPrefetchCount(5);
return container;
}
}
Fanout 的广播模式体现在哪里
经测试Topic和Fanout 模式下都可以有多个消费者,而这些消费者都是轮流消费消息,所以Fanout的广播模式体现在那个地方
消费端丢数据(启动手动确认模式可以解决)
指定 Acknowledge的模式
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual # 表示该监听器手动应答消息
三种确认模式:
1.自动确认模式
消费者挂掉,待ack的消息回归到队列中,消费者抛出异常,消息会不断的被重发,直到处理成功,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试
2.手动确认模式
1.使用手动确认应答消息,千万不能忘记应答消息(channel.basicAck()),对于Rabbitmq来说处理消息没有超时,只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响业务执行
2.如果消费者来不及处理就死掉时,没有响应ack,会在项目启动后重复发送一条信息给其他消费者
3. 可以选择丢弃消息,这其实也是一种应答,(channel.basicnNack(deliveryTag,false,false))
4.如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的
5.如果消费者没有设置手动应答模式,并设置了重试,那么在出现异常时,如果没有捕获异常则会进行重试,如果捕获了异常不会重试
3.不确认模式
acknowledge="none" 不使用确认机制,只要消息发送完成就会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发
当出现异常时,需要把这个消息回滚到消息队列
1.ack返回false,并重新回到队列,
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
2. 当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍然在队列头部,会导致消费者立马又接收到这条消息进行处理,接着抛出异常,进行回滚,反复进行,这种情况会导致队列出现阻塞,消息堆积,导致正常的消息也无法运行,所以对于这种情况,我们就需要把回滚的消息放到队列尾部。
// 手动进行应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//重新发送消息到队列尾部
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(new Object()));
Rabbitmq的限流
限制MQ的消费速率
Fanout广播体现在哪里
网友评论