1 消息服务的出现
后台微服务的相互调用一般有以下几种。
种类 | 协议 | 支持 | 优缺点 |
---|---|---|---|
restful api、soap、graphql等 | http/json、xml | 实现了http的语言都支持 | 通用性强,不需要额外的学习成本,但是性能比较差。 |
grpc | http2.0 /Protobuf | 常用的开发语言 | 基于http2请求和Protobuf数据解析,性能高于http,但是需要编写idl。 |
thrift | TFramedTransport/TBinaryProtocol 等 | 基本全语言 | thrift基本socket封装,可以根据需求选用不用的传输协议和数据解析格式,性能高于grpc,也需要idl。 |
nanomsg、kafka、rabbitmq等 | 各种mq协议 | 基本全语言 | 不同的mq适用与不同的场景,但是都是为了解耦,并且消费者可以扩展。 |
2 AMQP(高级消息队列协议)
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP的标准实现。
2.1 规范文档
以下为选取部分文档中重点来解析
2.2 模型
消息从"发送端"(publisher)把消息发布到"交换器"(exchange),通常比邮局或邮箱。"交换器"根据"路由关键字"(routing-key)去绑定(binding)一个队列(queue)。然后AMQP代理(broker)向消费者(consumers)传递消息订阅队列(queues),或消费者从队列获取消息。
hello-world-example-routing.pngThe AMQP 0-9-1 Model has the following view of the world: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.
2.2 message、producter、consumer
message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
producter(Publisher): 生产者,一般指生产消息的一端。
consumer: 消费者,一般指消费消息的一端。
2.2 Broker
消息队列服务器的实体,也可以理解为代理,不管消费者还是生产者,都需要连接到broker,才能进行生产消费。
2.3 exchange and exchange types
rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchagne的4种type去定义的。
type | 创建vhost时默认创建的exchange 的名称 |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
exchange还有以下属性:
attribute | type | describe |
---|---|---|
name | string | exchange的名称 |
Durability | bool | exchange是否持久化 |
Auto-delete | bool | 当所有绑定队列都不再使用时,是否自动删除该交换器 |
Arguments | object | 使用 broker-specific 时候的参数 |
exchange 的type
-
Default Exchange
default exchange(默认交换器)是没有名字的direct exchange。name为空字符串。所有queue都默认binding 到该交换器上。所有binding到该交换器上的queue,routing-key都和queue的name一样。例如: 当创建一个name="search-indexing-online"的queue,broker会把改queue绑定到name="search-indexing-online"的routing-key上。因此消息发送到default exchange并且匹配到search-indexing-online的router,则该消息被送到search-indexing-online的queue。 -
Direct Exchange
direct exchange(直接交换器)是理想的单播路由的消息交换(尽管它们可以用于多播路由)。例如一个queue 绑定到"router key" =K的direct exchange上,那么当发送一个router key为R的message到该direct exchange,那个该消息会推送到"router key"=K的queue上。
-
Fanout Exchange
fanout exchange(展开交换器),该交换器会把消息发送到所有binding到该交换器上的queue。这种是publisher/subcribe模式。用来做广播最好。
image.png -
Topic Exchange
topic exchange(通配符交换器),exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。这里的routingkey可以有通配符:'','#'。其中''表示匹配一个单词, '#'则表示匹配没有或者多个单词 -
Header Exchang
header exchang(自定义交换器),根据自定义的header attribute去匹配不同的queue。
2.4 Quue
queue(队列,task-queueing系统),主要存储消息被提供消费者进行消费。
queue还有以下属性:
attribute | type | describe |
---|---|---|
name | string | queue的名称 |
Durability | bool | queue是否持久化 |
exclusive | bool | 当消费者断开连接后是否删除该队列 |
Auto-delete | bool | 当所有消费客户端连接断开后,是否自动删除队列。 |
Arguments | object | 使用 broker-specific 时候的参数 |
2.5 bindings
exchange和queue通过routing-key关联,这两者之间的关系是就是binding。
2.6 Message Acknowledgements
消息应答。执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
2.7 Rejecting Messages
拒绝消息。当消费者应用程序收到消息时,该消息的处理可能会成功,也可能不会成功。 消费者可以通过拒绝消息向代理指出消息处理失败(或当时无法完成)。 当拒绝消息时,消费者可以要求代理丢弃或重新发送消息。 当队列中只有一个消费者时,确保您不会通过一次又一次地拒绝并重新发送来自同一个消费者的消息来创建无限的消息传递循环。
2.8 Negative Acknowledgements
拒绝应答。消费者使用 basic.reject拒绝消息,则该消息为Rejecting Messages。AMQP
只能一次拒绝一条消息,但是如果用的rabbitmq则可以拒绝多个消息。参考地址
2.9 Prefetching Messages
预取消息。指定channel(通道)的等待处理的消息个数,如果等待的消息已经达到该值,则该消费者不再接受新的消息。默认的channel不限制个数。最好的方式是设置该值在一个合理的数值,达到多消费者之间的简单负载均衡。
2.10 Message Attributes and Payload
消息的属性和有效载荷(携带的数据)。
某些属性由AMQP代理使用,但大多数属性可以接收它们的应用程序使用。有些属性是可选的,称为标题。它们与HTTP中的X-Headers类似。邮件发布时设置邮件属性。
AMQP消息也有一个有效负载(它们携带的数据),AMQP代理将其视为一个不透明的字节数组。经纪人不会检查或修改有效载荷。消息可能只包含属性而没有有效载荷。使用JSON,Thrift,Protocol Buffers和MessagePack等序列化格式来序列化结构化数据以便将其发布为消息有效载荷是很常见的。 AMQP同伴通常使用“内容类型”和“内容编码”字段来传达这些信息,但这只是惯例而已。
消息可能会作为持久性发布,这会使AMQP代理将它们保存到磁盘。如果服务器重新启动,系统会确保接收到的持久性消息不会丢失。简单地将消息发布到持久交换或者将其发送到队列的事实是持久的并不会使消息持久化:这完全取决于消息本身的持久模式。将消息发布为持久性会影响性能(就像使用数据存储一样,持久性在性能上会带来一定的成本)。
Content type
Content encoding
Routing key
Delivery mode (persistent or not)
Message priority
Message publishing timestamp
Expiration period
Publisher application id
2.11 Connections
AMQP是一种使用TCP进行可靠传输的应用程序级协议。 AMQP连接可以使用身份验证,并且可以使用TLS(SSL)进行保护。 当应用程序不再需要连接到AMQP代理时,它应该正常关闭AMQP连接,而不是突然关闭底层TCP连接。
2.12 Channels
某些应用程序需要多个连接到AMQP代理。 但是,不希望同时打开多个TCP连接,因为这样做会占用系统资源并使配置防火墙变得更加困难。 AMQP 0-9-1连接可被认为是“共享单个TCP连接的轻量级连接”的通道复用。
对于使用多个线程/进程进行处理的应用程序,通常为每个线程/进程打开一个新通道并且不共享它们之间的通道。
特定通道上的通信与另一个通道上的通信完全分离,因此每个AMQP方法都会携带一个通道号,客户端可以使用该通道号来确定该方法适用于哪个通道。
2.13 Virtual Hosts
为了使单个代理可以托管多个孤立的“环境”(用户组,交换,队列等),AMQP包含虚拟主机(虚拟主机)的概念。 它们与许多流行的Web服务器使用的虚拟主机相似,并提供AMQP实体所处的完全隔离的环境。 AMQP客户端指定在AMQP连接协商期间他们想要使用哪些虚拟主机。
3 rabbitmq
rabbitmq是使用erlang实现AMQP规范的消息代理。支持各种语言的客户端。
网友评论