介绍
RabbitMQ官网中介绍了6种工作模式,其中RPC不常用,本文主要介绍前5种。
简单模式(最简单的收发模式)

最简单的一个模式,开发者定义一个命名队列,然后publisher向这个命名队列中发送消息。最后consumer可以通过这个命名队列获取待处理的消息。
Exchange,Queue以及消息均没做持久化设置。
consumer设置了自动回复ack。
exchange type为direct
数据流:
- publisher往Exchange发送消息;
- 默认的Exchange绑定了命名queue - hello,所以Exchange往该queue发送消息;
- broker发现有consumer在监听queue,把消息 "推送" 到consumer。
Work Queues模式(资源的竞争)

简单模式中,consumer的任务只是取出message,然后打印,这个过程非常快。 但现实场景任务往往是各种各样,任务变成例如将word转换成pdf这些cpu密集型的任务,处理速度就没那么快,而且1个consumer同时只能处理1条message,当生产message速度大于消费速度时,就会造成message堆积在queue,吞吐量下降。这时就要添加更多consumer并行消费message,而且consumer可以动态伸缩。
exchange type为direct
message分发模式之1 - Round-robin 轮询
RabbitMQ默认采用轮询模式分发消息到各个consumer,例如有两个consumer,c1,c2,同时启动c1,c2从queue中获取任务,任务是按顺序依次分发到c1,c2,c1,c2... 有点类似是push模式。
message分发模式之2 - Fair 公平分发
轮询的分发方式其实不是恰当,例如有两个consumer,c1,c2,连接到RabbitMQ,如果采用轮询方式,c1只会接收编号为奇数的message,c2只会接收偶数的message,假如奇数message都是很重的任务,而偶数message都是很轻,那么很可能c1经常处于忙碌状态,而c2则经常处于空闲状态。 但RabbitMQ不知道各个consumer处理的情况,它只管第n条消息分发给第n个consumer。
为了解决这个问题,需要consumer告知RabbitMQ,在未处理完当前消息前,不要发新的消息发送过来。
相当于RabbitMQ不会假设第n条消息一定要发给第n个consumer,当某个consumer确认message处理完后,RabbitMQ才会把queue中最前面的消息取出并分发给这个consumer。
//当消费者有x条消息没有响应ACK时,不再给这个消费者发送消息
channel.basicQos(int x)
message确认
consumer处理message可能需要一定时间,如果在处理途中挂掉,那么message未处理完毕,就造成数据丢失。
RabbitMQ提供message 确认机制,consumer应该在确保自身处理完message后,再往RabbitMQ发送ack,这时RabbitMQ方可认为消息完成处理。 否则,一旦consumer死亡,例如断开了连接,channel关闭了,RabbitMQ会把当前分发给该consumer,但又未ack的message回收,重新分发给其他consumer。
这样即使consumer挂了,也不会造成message丢失。
message持久化
虽然解决了message可能会丢失问题,但是当RabbitMQ关闭或者崩溃时,message依然会丢失。
所以需要对queue和message做持久化保存。
PS:RabbitMQ是不允许对已经存在的queue进行参数设置,例如hello queue在创建时,durable参数设置为false,就不能改为true。
PS:即使设置了持久化选项,在非发布订阅模式下,message也不是每秒都会调用fsync命令刷盘。
Publish/Subscribe模式(共享资源)

简单模式和work queues模式本质都是一样,只有1个queue。message发送给exchange后,exchange判断为direct模式后,直接就将message转发到绑定的queue。而且1条消息只能发送给1个consumer。假如1条消息要被多个consumer消费呢。
这就需要用到Publish/Subscribe模式,该模式下
- publisher只创建type为fanout的命名exchange;
- consumer根据需要创建自己的queue,并且连接到publisher创建的exchange中,实施exchange与queue的绑定;如果queue不需要持久化,则可用临时queue;
- publisher发送message,message到达exchange后,exchange判断为fanout模式,就直接把message发送给与自己绑定的queue中;
- 每个queue将消息分发给与自己连接的consumer。
exchange type为fanout,比起direct,exchange与queue从1对1binding,转变为1对多binding。
Routing模式
Publish/Subscribe模式下,exchange会把message全部发送给与其bind的queue。
不同queue如果想有选择地过滤message,在fanout模式下是做不到的。
例如日志系统,queueA只接收warning,error级别日志,queueB只接收fatal日志。
这就需要用到routing key了。
PS:fanout模式下,发布message和bind exchange时routing key参数是会被自动忽略。
Direct Exchange
exchange type使用direct取代fanout,多个consumer自行创建queue,并且设置routing key绑定到exchange。

如图:
- routing-key为orange的message只会发送到Q1;
- routing-key为black 或 green的message只会放到Q2;
- 其他message都会被忽略丢弃。
Multiple bindings

如果多个consumer关注了同一个routing-key,那么实际就会表现如fanout模式。
routing-key为black的messag均会发送到Q1 Q2。
Topics模式(路由模式的一种)
routing模式依然存在一些限制,例如无法支持routing key模糊匹配
像unix的一个工具 syslog,它的路由规则是基于 日志级别(info/warn/crit...) + 设备来源(auth/cron/kern...)。
例如某个consumer需要监控来自auth的info和error message,如果用routing模式,就需要绑定 auth.info 和 auth.error两个key,不方便。
这就需要更强的Topics模式,其实就是Routing模式之上支持模糊匹配。
发送到topic exchange的routing-key必须是有规则的字符串,例如以.分隔的,并且每个分隔符内的字符串带有特征,例如"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。 routing-key长度最大为255字节。
topic其实与direct模式很相似,对于binding keys,topic模式多了两个特征
- "*" 星号可以替代任意一个字符;
- "#" 井号可以替代0到多个字符。

例如routing-key为描述动物的,格式为 "<speed>.<colour>.<species>".
那么两个queue可被描述为:
-
Q1关注所有橙色的动物;
-
Q2关注所有关于兔子和懒惰的动物。
-
"quick.orange.rabbit"的描述动物会被发送到两个queue;
-
"lazy.orange.elephant"会被发送到两个queue;
-
"lazy.brown.fox"只会发送到第二个queue;
-
"lazy.pink.rabbit"会被发送到第二个queue,且只发送一次;
-
"quick.brown.fox"会被丢弃;
-
"orange"会被丢弃;
-
"quick.orange.male.rabbit"会被丢弃;
-
"lazy.orange.male.rabbit"会被发送到第二个queue.
Topic exchange 也可以表现得和direct和fanout模式一样
- 当queue 的binding key设置为 "#"时,表现接收所有message,就如fanout exchange一样;
- 当queue 不使用 "#" 和 "*" 时,表现如direct一样。
网友评论