-
不同于JMS只是一种Java API,RabbitMQ实现了AMQP标准,与具体的编程语言无关。
-
AMQP-0-9-1协议事实上是一个编程模型,也即协议中的实体(包括exchange,queue,binding)通常由应用程序自行创建,而不是由rabbitmq的管理员创建。
-
RabbitMQ中有Exchange和Queue之分,Queue才是Message真正存储的地方,Queue通过Binding Key绑定到Exchange,Message只能直接发到Exchange,在发送时指定Routing Key,当Message的Routing Key和某个Queue的Binding Key匹配时,Message则从Exchange路由到Queue。Exchange有多种类型,不同的类型对于匹配的执行时不一样的,比如Direct类型的Exchange需要Binding Key和Routing Key的精确匹配,而Topic类型的Exchange则通过“*”和“#”匹配。
-
Exchange有4种类型的:
- direct,即通过message的routing key与queue的binding key直接精确匹配,如果相等则将message拷贝到queue中;
- topic,虽然也是message routing key和queue的biding key的匹配,但是此时可以通过“”和“#”做模糊匹配;“”表示1个单词,“#”表示0个或多个单词;
- fanout,即发到exchange的message会被拷贝到所有绑定的queue,此时message的routing key将被忽略;
- headers,即通过message中header值判断该message应该路由到哪个queue。
-
routing key必须为“以点(.)分割的单词组”,对于topic exchange来说,当queue的routing key中只有“#”时,此时的exchange即充当了一个fanout,当queue的routing key中既不包含“*”,也不包含“#”,此时的exchange即充当了一个direct。
-
对于direct的exchange,如果有多个queue通过相同的routing key绑定到了exchange,那么消息将同时到达多个quque。
-
rabbitmq有个默认的exchange,名字以空字符串("")表示,类型为direct,所有queue都以该queue自己的名字为binding key绑定到了默认exchange上。这样做为客户端提供了一种便利,即客户端只需要将message发到默认exchange,发送是指定routing key即为queue的名字即可,就好像message是直接发给某个queue一样。
-
AMQP中的消息是在consumer层面进行负载均衡,而不是queue级别,也即消息会会通过round-robbin的方式平均地发送给所有consumer。
-
在Consumer手动Confirm模式下,只有在rabbitmq发现consumer的connection、channel或者TCP连接断开的情况下,才会认为message没有正常投递,此时rabbitmq会立即将message发送给另外的consumer。无论consumer在处理message时需要多长的时间,只要上述连接没有断开,那么rabbitmq都认为是中正常情况而不会认为message没有被confirm。
-
queue属性:name表示名字,durable表示是否持久化,Exclusive=true表示只有一个connection使用该queue,并且该connection断掉之后queue自动删除,autodelete=true表示当queue上没有consumer时,queue将被删除。
-
在声明queue时,如果queue不存在则创建queue,如果存在则什么都不做。如果新声明的queue已存在但是声明的属性与原来queue的属性不一致,则将抛出channel级别的异常。
-
amqp可以自动生成queue名字,只要在声明时传入空值即可。
-
创建临时queue(自动删除的queue)有3中方式:1.设置queue为exclusive(此时queue只能被声明它的connection使用,当connection关闭或者TCP连接断开之后,exclusive queue将自动删除,通常情况下exclusive queue的名字都有服务器自动生成),2.设置queue的TTL,3.设置auto-delete。
-
可以对queue设置长度限制,可以对queue和message设置TTL。
-
只有当queue设置了最大priority时,queue中的message的priority才起作用,官网建议的priority是从1到10。
-
每一个consumer都有一个唯一标识,rabbitmq称其为consumer tag,通过consumer tag可以对consumer进行操作,比如取消某个consumer等。
-
对于新添加的consumer,如果queue中有message,那么message将立即被送往consumer。
-
consumer通常在程序启动的时候创建,并且通常持续到程序结束。
-
消息的push模式通过向queue注册一个consumer完成。
-
rabbitmq在投递消息给consumer时,会在message中加入以下property:
Property | Type | Description |
---|---|---|
Delivery tag | Positive integer | Delivery identifier, see Confirms. |
Redelivered | Boolean | Set to true if this message was previously delivered and requeued
|
Exchange | String | Exchange which routed this message |
Routing key | String | Routing key used by the publisher |
Consumer tag | String | Consumer (subscription) identifier |
- publisher在发送message时可能加上以下message property,其中多数都是可选的:
Property | Type | Description | Required? |
---|---|---|---|
Delivery mode | Enum (1 or 2) | 2 for "persistent", 1 for "transient". Some client libraries expose this property as a boolean or enum. | Yes |
Type | String | Application-specific message type, e.g. "orders.created" | No |
Headers | Map (string => any) | An arbitrary map of headers with string header names | No |
Content type | String | Content type, e.g. "application/json". Used by applications, not core RabbitMQ | No |
Content encoding | String | Content encoding, e.g. "gzip". Used by applications, not core RabbitMQ | No |
Message ID | String | Arbitrary message ID | No |
Correlation ID | String | Helps correlate requests with responses, see tutorial 6 | No |
Reply To | String | Carries response queue name, see tutorial 6 | No |
Expiration | String | Per-message TTL | No |
Timestamp | Timestamp | Application-provided timestamp | No |
User ID | String | User ID, validated if set | No |
App ID | String | Application name | No |
- 在注册consumer到queue时,可以将该consumer设置为exclusive,表示该queue只能有一个consumer,如果queue中已经有其他consumer时,设置exclusive会失败。
- 可以为consumer设置priority,priority高的consumer将优先获得message。
- consumer需要自行处理消息投递过程中的所有异常,异常需要被日志记录下来,然后ignore掉。
- 通常的client库中,都提供了thread pool用于异步的consumer操作。
- queue和message都可以设置message的TTL。
- 设置queue级别的messageTTL可以通过policy或者创建queue时指定message TTL参数x-message-ttl。
- message级别的TTL可以在publish消息的时候通过expiration参数设置。
- 当queue和message都设置有message的TTL时,较小的值生效。
- 可以设置queue本身的TTL,通过policy或者声明queue时设置x-expires参数。
- queue的最大长度可以通过最大消息数量或者最大字节数设置。
- 通过声明时设置queue参数或者创建policy可以设置queue的长度,二者同时设置时较大者生效。只有ready message用于计算queue的长度,unacknowledged message不会参与计算。
- 通过设置overflow参数可以控制queue达到最大长度时的行为,比如设置overflow=reject-publish时,会直接丢弃消息,此时如果publisher confirm启用,那么会通知publisher以nack消息,如果只有部分queue达到最大长度,那么同样会返回nack消息。
- 可以在声明queue时设置overflow参数,主要有两种值:reject-publish和drop-head(默认)。
- 在声明queue时,可以通过x-max-length参数指定queue的最大长度。
- lazy queue即尽量将message放到磁盘中,只有当有consumer要消费时才从磁盘加载到内存中。lazy queue通常用于超长queue,比如有时consumer离线或者consumer处理速度远远小于进queue速度。
- queue在持久化到磁盘时会阻塞,也即此时queue无法接收message,因此lazy queue会在很大程度上影响message接收效率。
- 声明queue时,可以通过x-queue-mode参数指定是否为lazy queue,x-queue-mode有两个值,一个是default,一个是lazy,默认情况下为default。
- 通过
channel.exchangeBind("destination", "source", "routingKey");
可以设置exchange之间的绑定。 - message在以下情况下可以被设置为放入死信exchange(DLX):1.消息被consumer拒绝接收(reject)并且requeue设置成了false;2.消息被consumer反面确认(nack)并且requeue设置成了false;3.由于TTL,消息过期;4.由于设置了queue的最大长度,消息被丢弃。
- DLX只是普通的exchange,可以通过常规方式进行声明。
- 可以通过policy或者声明queue时指定x-dead-letter-exchange参数来设置DLX,后者具有更高优先权。同时可以通过x-dead-letter-routing-key设置message放入DLX时的routing key,如果没有指定,那么message本身的routing key将被使用。
- 客户端在nack或reject时可以指定requeue参数,表示该message是否应该重新放入queue中,或者丢弃/DLX掉。
- nack和reject的区别,nack可以同时拒绝多条message,而reject只能拒绝一条消息。通常的做法是:将nack的消息重新放入队列(requeue),而将reject的消息直接丢弃或者DLX。
- 声明queue时通过设置x-max-priority可以将queue设置为priority queue,然后在发送message时,可以通过basic.properties中的priority指定message的priority。
- 在消费message时,可以通过指定 x-priority参数设置consumer的优先级,低优先级的consumer只有在高优先级的consumer被block后才能得到接收消息的机会。
- 创建connection时,rabbitmq使用以下默认参数:
Property | Default Value |
---|---|
Username | "guest" |
Password | "guest" |
Virtual host | "/" |
Hostname | "localhost" |
port | 5672 for regular connections, 5671 for connections that use TLS |
- guest用户只能连接localhost。
- channel和connection都是需要被close的,但是关闭connection会自动关闭channel。
- 通过passive方式声明queue:channel.queueDeclarePassive();此时只有queue事先存在才成功,否则抛出channel级别异常,整个channel无法使用。
- channe最好不要被多线程共享。
- consumer可以通过delivery tag进行ack,某个channel创建的consumer只能通过该channel进行ack。
- 被reject的message会被尽量放在先前的位置,如果不能则尽量放在里queue头部较近的位置。
- 一直requeue将消耗很多资源,因此consumer需要自行记录requeue的次数,然后根据需求处理。
- 当message被requeue时,message将多一个redelivered属性,如果再次消费失败,那么consumer可以根据该relelivered属性来决定应该继续requeue还是放入DLX。
- 每次message被DLX,其中x-death中的count属性都会递增,可以通过该值来设置消息的最大retry次数。
- publisher confirms 遵循以下原则:
- an un-routable mandatory or immediate message is confirmed right after the basic.return;
- otherwise, a transient message is confirmed the moment it is enqueued; and,
- a persistent message is confirmed when it is persisted to disk or when it is consumed on every queue.
- publisher confirm是异步的,confirm由connection的mainLoopThread线程完成,并不在publish线程完成。
- Spring AMQP在publish的时候,如果connection无法创建,比如server不可达,那么rabbitmqtemplate将直接抛出异常。
- In a cluster, nodes identify and contact each other using node names.
- When a node starts up, it checks whether it has been assigned a node name. This is done via the RABBITMQ_NODENAME environment variable. If no value was explicitly configured, the node resolves its hostname and prepends rabbit to it to compute its node name.
- For a CLI tool and a node to be able to communicate they must have the same shared secret called the Erlang cookie. The cookie is just a string of alphanumeric characters up to 255 characters in size. It is usually stored in a local file. The file must be only accessible to the owner (e.g. have UNIX permissions of 600 or similar). Every cluster node must have the same cookie.
- Policies automatically match against exchanges and queues, and help determine how they behave. Each exchange or queue will have at most one policy matching。
- Policies can be used to configure the federation plugin, mirrored queues, alternate exchanges, dead lettering, per-queue TTLs, and maximum queue length.
- RabbitMQ is multi-tenant system: connections, exchanges, queues, bindings, user permissions, policies and some other things belong to virtual hosts, logical groups of entities.
- The only concrete implementation we provide is CachingConnectionFactory, which, by default, establishes a single connection proxy that can be shared by the application. Sharing of the connection is possible since the “unit of work” for messaging with AMQP is actually a “channel”.
- CachingConnectionFactory既能用于cache channel,也能用于cache connection factory。通过设置ConnectionFactory的cacheMode参数可以指定是cache channel还是factory。 each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache。
- RabbitTemplate的retry机制主要用于与broker的连接异常。
- 通过设置RabbitTemplate的usePublisherConnection可以将publish和consumer的connection分开。
网友评论