消息中间件
消息中间件就是在通信的上下游直接截断:break it,Broker
消息队列作用
- 秒杀场景的流量削峰
- 业务逻辑异步处理
- 模块解耦
RabbitMQ 架构
存储机制
- 持久化消息
- 非持久化消息
消息堆积性能下降
对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是 rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。
Q1 α :消息索引和消息内容都存内存,最耗内存,很少消耗CPU
Q2beta:消息索引存内存,消息内容存磁盘
delta: 消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,IO操作多,这样会增加 处理每个消息的平均开销
应对这一问题一般有3种措施:
- 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
- 采用multiple ack,降低处理 ack 带来的开销
- 流量控制
Connection和Channel的关系
RabbitMQ Broker建立 TCP连接,也就是 Connection
客户端创建 AMQP信道(Channel),信道建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP 指令都通过信道完成。
Channel复用 TCP连接
工作模式
- Work Queue,生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息。
- 发布订阅,使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个 Exchange,每个消费者都可以消费到完整的消息。
- 路由模式,使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey 。消费者定义队列并将队 、routingKey 、Exchange绑定。此时使用 direct 模式Exchange必须要 routingKey 完全匹配
- 主题模式,使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。
时使用通配符,交换器将消息路由转
- 生产者声明交换机,消息发送到交换机,消息带Routing Key。
- 消费者声明交换机,队列; 交换机、队列、Routing Key绑定。消费时消费队列的消息
// 生产者 声明fanout类型的交换器
channel.exchangeDeclare("ex.myfan", "fanout", true, false, null);
// 交换器,路由键,props,message
channel.basicPublish("ex.myfan",
"", // fanout类型的交换器不需要指定路由键
null,
("hello world fan:" + i).getBytes("utf-8"));
// 消费者 声明临时队列,队列的名字由RabbitMQ自动生成
final String queueName = channel.queueDeclare().getQueue();
channel.exchangeDeclare("ex.myfan",
BuiltinExchangeType.FANOUT,
true,
false,
null);
// fanout类型的交换器绑定不需要routingkey 参数 队列名称,交换器, Routing key
channel.queueBind(queueName, "ex.myfan", "");
channel.basicConsume(queueName, (consumerTag, message) -> {
System.out.println("One " + new String(message.getBody(), "utf-8"));
}, consumerTag -> {});
// 订阅模式
// 生产者, routingKey beijing.emp-bussiness.error(北京招聘业务错误日志)
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
// 消费者
final String queueName = channel.queueDeclare().getQueue();
channel.exchangeDeclare("ex.topic",
"topic",
true,
false,
null);
// fanout类型的交换器绑定不需要routingkey 参数 队列名称,交换器, Routing key
// 匹配 beijing.biz-online.error,# 匹配0到多个单词
channel.queueBind(queueName, "ex.topic", "beijing.#");
高级特性
RabbitMQ消息可靠性
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP/RabbitMQ的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息接口幂等性
RabbitMQ集群
- 主备模式,只有主节点提供服务。主节点故障,备份节点才切换成主节点
- 主从模式
- 主主模式
- 分片集群
- 异地多活
网友评论