和RocketMQ一样,RabbitMQ作为消息中间件,主要负责接收、存储和转发消息。同时RabbitMQ也有生产者和消费者的概念,我们先来看看其整体模型架构。如下图:
1. 核心概念介绍
1.1 生产者和消费者
- Producer:生产者,投递消息的一方。
通常来说,一条消息可以包含2个部分:消息体和标签。消息体一般是一个带有业务逻辑结构的数据,而标签则用来描述这条消息,比如一个交换机的名称和一个路由键。
- Consumer:消费者,接收消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体,在消息路由的过程中,消息的标签会丢失,存入到队列中的消息只有消息体,实际上,消费者也不需要关心消息的生产者是谁。
- Broker:消息中间件所在的服务节点。
消息流转过程如下:
1.2 队列
- Queue:队列,是RabbitMQ的内部对象,用于存储消息。
RabbitMQ中消息都只能存储在队列中,这一点和RocketMQ不同,RocketMQ将消息存储在topic这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。
多个消费者可以订阅同一队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。(类似RocketMQ中的负载均衡模式)
RabbitMQ不支持队列层面
的广播消费(但并不是说RabbitMQ不支持广播消费),RocketMQ是支持广播消费的。
1.3 交换器、路由键、绑定
- Exchange:交换器,实际上消息生产者并不是直接将消息发送到队列的。而是将消息发送给交换器,由交换器将消息路由到一个或多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
其实从大的层面上来说,和RocketMQ消息投递的设计理念是一样的。在RocketMQ中,生产者也并不是直接往Broker投递消息的,而是通过NameServer和Topic共同决定投递到某个Broker中。
- RoutingKey:路由键,生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
- Binding:绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列中了。
这里可能对路由键和绑定键存在一定的理解混淆,其实很简单,绑定键就是一个连接交换器和队列的关键词,而路由键是交换器来根据自身类型以及绑定关系找到适配队列的关键词。
虽然RocketMQ中没有这样的概念,但是RocketMQ中也有类似的设计理念。生产者在发送一条消息时,需要指定Topic,还可以指定Tags。订阅了同一个Topic的消费者,还可以根据Tags来判断是否消费消息。也即间接的消费某一类消息,而不是消费这个Topic下所有的消息。
1.4 交换器类型
常用的交换器类型有fanout
、direct
、topic
这三种。
- fanout
这种类型的交换器,它会把生产者发送的消息路由到所有与该交换器绑定的队列中。也就是说,此时交换器不会去理会RountingKey
是什么了。
- direct
这种类型的交换器,它会把生产者发送的消息路由到那些BindingKey
和RoutingKey
完全匹配的队列中。
- topic
fanout
类型的交换器太过于随便,direct
类型的交换器又太过于严格。而topic
类型的交换器正是介于fanout
和direct
之间,和direct
类型的交换器类似,也是讲消息路由到BindingKey
和RoutingKey
相匹配的队列中,但这里的匹配规则有些不同,它允许BindingKey
中存在两种特殊字符串*
和#
用于做模糊匹配,其中*
用来匹配一个单词,#
用来匹配0个或多个单词。举例:
1)、路由键为"com.rabbitmq.client"的消息会同时路由到Queue1和Queue2中。
2)、路由键为"com.hidden.client"的消息只会路由到Queue2中。
3)、路由键为"com.hidden.demo"的消息只会路由到Queue2中。
4)、路由键为"java.rabbitmq.demo"的消息只会路由到Queue1中。
5)、路由键为"java.util.concurrent"的消息将会被丢弃或者返回给生产者。
2. RabbitMQ运转流程
了解以上的RabbitMQ架构模式以及相关概念,再结合《RabbitMQ:安装》中的例子回顾一下整个消息队列的使用过程。
-
生产者发送消息
- 生产者通过
ConnectionFactory
连接到RabbitMQ Broker(RocketMQ中,生产者和消费者是跟NameServer进行连接的),建立一个连接Connection
,开启一个信道Channel
。 - 生产者声明一个交换器并设置相关属性,比如交换器类型、是否持久化等。
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等。
- 生产者通过路由键将交换器和队列进行绑定。
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
- 相应的交换器根据接收到的路由键查看相匹配的队列。
- 如果找到,则将消息存入相应的队列中;否则根据生产者配置的属性选择丢弃还是回退给生产者。
- 关闭信道、关闭连接。
实际上,在生产环境,交换器、队列、绑定关系是提前创建好的。不需要在生产者发送消息时显示的声明交换器、队列和绑定关系。
- 生产者通过
- 消费者消费消息
- 消费者连接到RabbitMQ Broker,建立一个连接,开启一个信道。
- 消费者向RabbitMQ Broker请求消费相应队列中的消息。
- 等到RabbitMQ Broker回应并投递相应队列中的消费,消费者接收消息。
- 消费者确认接收到的消息。
- RabbitMQ从队列中删除相应已经被确认的消息。
- 关闭信息、关闭连接。
在这里,我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个信道Channel,每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条指令都是通过信道完成的。
我们完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢?试想这样的一个场景,一个应用程序中有很多线程需要从RabbitMQ中消费消息或者生产消息,那么必然需要建立很多个Connection,也就是许多个TCP连接。然而对于操作系统而言,建立和销毁TCP连接都是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ采用类似NIO的做法,选择TCP连接富用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中。
网友评论