历史
今天聊聊消息队列
Advanced Message Queuing Protocol (AMQP)
AMQP:前进消息队列协议
历史:
消息队列来源已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB)。
TIB被电信和通讯公司采用,路透社收购了Teknekron公司。之后,IBM开发了MQSeries,微软开发了Microsoft Message Queue(MSMQ)。
这些商业MQ供应商的问题是厂商锁定,价格高昂。2001年,Java Message queuing试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。
于是2004年,摩根大通和iMatrix开始着手Advanced Message Queuing Protocol (AMQP)开放标准的开发。
2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布。后来又被spring收购
AMPQ是使用Erlang开发的,Erlang这哥们主要用来搞游戏与电信方面的东西,ok,下面说说AMPQ又什么用?
应用
亦RabbitMQ Server:维持roducer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证
Client A & B:数据的发送方,一个Message有两个部分:payload(有效载荷)和label(标签),payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,
它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
Client 1,2,3:数据的接收方
Exchanges are where producers publish their messages.
Queuesare where the messages end up and are received by consumers
Bindings are how the messages get routed from the exchange to particular queues.
还有几个概念是上述图中没有标明的,那就是Connection(连接),Channel(通道,频道)。
Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力
一些说明
默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer。
如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。
那么什么是正确收到呢?通过ack。每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:
RabbitMQ Server会把这个信息发送到下一个Consumer。
如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。
而且ack的机制可以起到限流的作用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。
当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。
4.2 Reject a message
有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。
4.3 Creating a queue
Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。
这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?
那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。
那么谁应该负责创建这个queue呢?是Consumer,还是Producer?
如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。
queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。
4.4 Exchanges
从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
· Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
· Fanout exchange: 会向响应的queue广播。
· Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
4.5 Virtual hosts
每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。
网友评论