AMQP的学习
本文转自:https://blog.csdn.net/letempsar/article/details/52565020
开源地址 :https://github.com/streadway/amqp
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
为什么使用AMQP或者AMQP解决了什么问题?
在分布式的系统中,子系统之如果使用socket连接进行通讯,有很多问题需要解决。比如:
1)信息的发送者和接受者如何维持这个连接,如果一方中断,这期间的数据如何防止丢失?
2)如何降低发送者和接受者的耦合度?
3)如何让优先级高的接受者先接到数据?
4)如何做到load balance?均衡接受者的负载?
5)如何将信息发送到相关的接收者,如果接受者订阅了不同的数据,如何正确的分发到接受者?
6)如何做到可扩展,将通信模块发到集群上去。
7)如何保证接受者接到了完整,正确或是有序的数据?
AMQP解决了这些问题。与此同时,基于AMQP实现的产品相比其他类似产品(AcitveMQ,Openfire)有着自己的特点。
产品优点缺点
OpenFire(XMPP)1.成熟稳定2.适合做IM服务器1.消息可靠性无保障2.路由策略不灵活3.集群模式不完善4.协议太重
ActiveMQ(JMS)1.成熟稳定2.与Java契合度高1.路由策略不灵活2.集群模式不稳定
RabbitMQ(AMQP)1.成熟稳定2.路由策略灵活3.消息传输可靠4.集群方案成熟1.配置多,学习和运维成本高
由上图比较可以看出,基于AMQP的RabbitMQ具有路由灵活,消息可靠等特点,当有路由策略多样化,和消息可靠传输的需求时可考虑使用基于AMQP的产品。
生产者(Producer):向Exchange发布消息的应用。 消费者(Consumer):从消息队列中消费消息的应用。 消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。 消息(Message):传输的内容。 交换器(exchange):路由组件,接收Producer发送的消息,并将消息路由转发给消息队列。 虚拟主机(Virtual Host): 一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。 Broker :AMQP的服务端称为Broker。 连接(Connection):一个网络连接,比如TCP/IP套接字连接。 信道(Channel):多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。 绑定器(Binding):消息队列和交换器直接的关联。
(1)建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。 (2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。 (3)发送消息。由Producer发送消息到Broker中的exchange中。 (4)路由转发。exchange收到消息后,根据一定的路由策略,将消息转发到相应的queue中去。 (5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。 (6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
exchange 将消息发送到哪一个queue是由exchange type 和bing 规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。 Direct exchange 直接转发路由,其实现原理是通过消息中的routkey,与queue 中的routkey 进行比对,若二者匹配,则将消息发送到这个消息队列。 Fanout exchange 复制分发路由,该路由不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列。 topic exchange 通配路由,是direct exchange的通配符模式,消息中的routkey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配表达式的queue。 需要注意的一点只有queue具有保持消息的功能,exchange不能保存消息。
AQMP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:
比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行。
在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行。
在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。
将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。
AMQP 是一种协议, RabbitMQ是一个由erlang开发的AMQP的开源实现,目前使用比较广泛的MQ有RabbitMQ,ActiveMQ,KafKa等等,其中ActiveMQ是基于JMS的一个开源实现,JMS 是一个接口标准或者说是一个API消息服务的规范(JAVA Message Service,java消息服务),KafKa是一种高吞吐量的分布式发布订阅消息系统,通常有吞吐量需求的日志处理和日志聚合应用会使用Kafka,性能要优于Rabbit,但是稳定性和可靠性相对而言RabbitMQ要成熟一些。
网友评论