队列Queue
订阅消息通过AMQP的basic.consume进行持续订阅,而单条的使用basic.get,其原理仍然是订阅消息队列,但是在获取到一条消息之后就会断开,这样一次只获取一条。
消息订阅需要消费者的确认:basic.Ack命令显示的向rabbitmq服务器进行消息确认。消费者在消费或者拒绝最近接收的一条消息后,就能从队列中自动获取到下一条消息
接收到消息后,在确认之前断开连接,认定为消息未分发,会重新发给另一个消费者,另一方面:在接收消息后不确认,rabbitmq将不会再发送其他消息,从而防止过载
在接收消息后,拒绝消息,
1.使用basic.reject 命令,表示拒绝该消息,命令中的参数requeue是true,则表示重新分发。如果为false,则表示移除消息,移除消息还有一个方法是使用basic.ack进行消息确认,这样就可以简单的移除消息啦。
2.断开连接,这样会认定消息未分发而发给另一个消费者
- 创建队列:
AMQP中的queue.declare进行创建 队列设置的参数 exclusive ,auto-delete passive设置为true时,可以检查队列是否存在
交换器Exchange和绑定Binding
-
绑定键(binding key) 队列通过绑定键绑定到交换器,而交换器负责接收消息时如何把消息分配到对应路由。消息进入服务器,消息将拥有一个路由键,交换器通过消息路由键和队列注册绑定键进行匹配,从而进行分发!
-
交换器:direct fanout,topic,headers。headers交换器和direct功能相类似,只是在匹配时时按照消息的header而不是路由键进行,效率差。因此没有再使用了。
1.direct类型交换器是直接交换器:如果路由键==绑定键,就路由到对应的队列。这个交换器中的路由键是在绑定队列到exchange时使用的bindingkey的集合(可以设置多个)
image.png
image.png服务器中会有一个default的默认交换器,在不声明exchange直接binding时,会把Queue绑定到default的exchange上,该exchange的bindingkey==Queue.Name。在publish消息时,publish中的exchange为空,直接publish指向Queue,这样就会直接发送消息到default exchange,exchange自动给message添加上Queue.Name作为路由键。
2.fanout交换器,把消息广播发送到该交换器上的所有附加队列,这样每个队列都获取到消息,从而进行不同的逻辑处理。
3.topic交换器:通过设置binding key作为通配规则来进行消息路由匹配,如:.message,表示后缀为.message的所有消息都会发送进来,‘#’表示所有规则匹配,’‘表示任意字符,中间的‘.’则表示了分隔功能.topic功能可以通过主题匹配模式进行分发路由,这时publish消息都是publish到exchange,并且给定routing key,由exchange来进行route到不同Queue中!
Vhost 虚拟主机和隔离机制:
vhost是amqp的基础,必须在连接时进行指定。默认虚拟主机是“/”
队列的arguments
队列在declare时可以设置一个args集合,用来定义queue的一些属性。
名字 | key | 说明 |
---|---|---|
messageTTL | x-message-ttl | 参数的value是一个毫秒单位的数字,但是要注意太大的数字可能会导致失效,该参数也可以用在发送消息时作为properties添加,对单条信息设置过期时间 |
auto expire | x-xipires | value是毫秒单位的数字,表示队列自动删除前的空闲等待时间 |
Max length | x-max-length | 设置队列中可以拥有的消息的数量,超过这个数量将会从队列头开始丢弃、若有死信exchange,则会推送到dlx中 |
Max length btyes | x-max-length-bytes | value是单位byte的数字,表示队列能存储最多多少大小的信息。和maxlength作用相同,一个是个数,一个是大小,两个同时可用,先到达哪个限制都可以开始生效,同样是超过队列大小限制后,从队列头开始丢弃消息 |
overflow behavior | x-overflow | value是{“drop-head”,"reject-publish","reject-publish-dlx"},如果队列是quorum类型的则只能是“drop-head” |
dead letter exchange | x-dead-letter-exchange | value是死信交换器的名字,当队列中的消息1.被消费者basic.reject/nack,requeue设置为false时,消息会推送到DLX中;2.消息过期;3.队列overflow之后(此时x-overflow=“drop-head”) |
dead letter routing key | x-dead-letter-routing-key | value是一个路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值。 |
single active consumer | x-single-active-consumer | value 是true、false,表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false) |
maximum priority | x-max-priority | value是一个数字(0255,推荐110),表示队列中的消息优先级属性的分级,开启了priority的队列,在发送消息到该类队列时可以设置message属性:priority的大小(默认不开启) |
lazy mode | x-queue-mode | value是{“default”,“lazy"},默认模式是消息尽量维护在内存中,lazy模式则会尽早的把内存中的消息持久化到磁盘上,减小内存开销 |
master locator | x-queue-master-locator | 在集群模式下设置镜像队列的主节点信息:1.random:随机一个节点作为主节点;2.min-masters:表示主节点配置在拥有最少主节点的集群节点上;3.client-local使用与client向连接的集群节点作为主节点 |
集群和镜像队列
rabbitmq的队列在集群模式下,默认是只存在在单个集群cluster上的,而exchange和bindings信息则是所有集群cluster上进行同步且保持一致性。
如何实现队列的高可用?
队列queues可以配置镜像队列,镜像队列是在多个cluster上设置相同slave,实现master-slave模式。镜像
镜像队列的所有操作都在master上,而不做负载均衡工作,slave仅是用作可用性的备份工作,在master不可用时,资历最老的slave会晋升为master。
- master配置
可以在queue的定义arguments中通过master locator定义队列的master节点选举确定策略。 -
镜像配置
镜像配置主要通过设置policy来实现。
网站管理界面
配置policy
在定义policy时,主要参数ha-mode和ha-params来控制镜像队列的配置情况,ha-sync-mode来设置消息同步策略
ha-mode | ha-params | Result |
---|---|---|
all | 不需要设置 | 在集群中的每个cluster上都设置镜像。当一个节点添加到集群中时,这个节点同样会有相应的镜像(镜像会根据policy设置的进行自动添加) |
exactly | 具体数量 | 指定在集群中设置镜像队列的节点个数。如果集群中节点的个数小于count的值,那么所有的节点都会配置镜像。如果其中一个节点上的镜像挂掉,那么会在另一个节点上生成新的镜像,以保持镜像队列的格式 ha-mode:exactly和ha-promote-on-shutdown:always一起使用将会很危险 |
nodes | cluster 的id list | 在指定的节点列表上设置镜像。节点名称可以通过rabbitmqctl cluster_status命令获取,通常名称是“rabbit@hostname”的这种形式。如果这些指定的节点都处于不可用状态(宕机或者关闭服务等),那么客户端程序会在自己所连接的那么节点上创建queue。 |
参数 | 值 | 说明 |
---|---|---|
ha-sync-mode | automatic/manual | (默认)manual模式在新的镜像队列加入时,不同步以前的消息,仅仅从加入后开始镜像,automatic模式则会自动同步当前所有的消息到镜像队列,注意:在进行同步消息时,会阻塞队列,直到同步完成 |
网友评论