美文网首页
RabbitMQ学习笔记

RabbitMQ学习笔记

作者: 无知者云 | 来源:发表于2019-05-05 22:53 被阅读0次
  • 不同于JMS只是一种Java API,RabbitMQ实现了AMQP标准,与具体的编程语言无关。

  • AMQP-0-9-1协议事实上是一个编程模型,也即协议中的实体(包括exchange,queue,binding)通常由应用程序自行创建,而不是由rabbitmq的管理员创建。

  • RabbitMQ中有Exchange和Queue之分,Queue才是Message真正存储的地方,Queue通过Binding Key绑定到Exchange,Message只能直接发到Exchange,在发送时指定Routing Key,当Message的Routing Key和某个Queue的Binding Key匹配时,Message则从Exchange路由到Queue。Exchange有多种类型,不同的类型对于匹配的执行时不一样的,比如Direct类型的Exchange需要Binding Key和Routing Key的精确匹配,而Topic类型的Exchange则通过“*”和“#”匹配。

  • Exchange有4种类型的:

    • direct,即通过message的routing key与queue的binding key直接精确匹配,如果相等则将message拷贝到queue中;
    • topic,虽然也是message routing key和queue的biding key的匹配,但是此时可以通过“”和“#”做模糊匹配;“”表示1个单词,“#”表示0个或多个单词;
    • fanout,即发到exchange的message会被拷贝到所有绑定的queue,此时message的routing key将被忽略;
    • headers,即通过message中header值判断该message应该路由到哪个queue。
  • routing key必须为“以点(.)分割的单词组”,对于topic exchange来说,当queue的routing key中只有“#”时,此时的exchange即充当了一个fanout,当queue的routing key中既不包含“*”,也不包含“#”,此时的exchange即充当了一个direct。

  • 对于direct的exchange,如果有多个queue通过相同的routing key绑定到了exchange,那么消息将同时到达多个quque。

  • rabbitmq有个默认的exchange,名字以空字符串("")表示,类型为direct,所有queue都以该queue自己的名字为binding key绑定到了默认exchange上。这样做为客户端提供了一种便利,即客户端只需要将message发到默认exchange,发送是指定routing key即为queue的名字即可,就好像message是直接发给某个queue一样。

  • AMQP中的消息是在consumer层面进行负载均衡,而不是queue级别,也即消息会会通过round-robbin的方式平均地发送给所有consumer。

  • 在Consumer手动Confirm模式下,只有在rabbitmq发现consumer的connection、channel或者TCP连接断开的情况下,才会认为message没有正常投递,此时rabbitmq会立即将message发送给另外的consumer。无论consumer在处理message时需要多长的时间,只要上述连接没有断开,那么rabbitmq都认为是中正常情况而不会认为message没有被confirm。

  • queue属性:name表示名字,durable表示是否持久化,Exclusive=true表示只有一个connection使用该queue,并且该connection断掉之后queue自动删除,autodelete=true表示当queue上没有consumer时,queue将被删除。

  • 在声明queue时,如果queue不存在则创建queue,如果存在则什么都不做。如果新声明的queue已存在但是声明的属性与原来queue的属性不一致,则将抛出channel级别的异常。

  • amqp可以自动生成queue名字,只要在声明时传入空值即可。

  • 创建临时queue(自动删除的queue)有3中方式:1.设置queue为exclusive(此时queue只能被声明它的connection使用,当connection关闭或者TCP连接断开之后,exclusive queue将自动删除,通常情况下exclusive queue的名字都有服务器自动生成),2.设置queue的TTL,3.设置auto-delete。

  • 可以对queue设置长度限制,可以对queue和message设置TTL。

  • 只有当queue设置了最大priority时,queue中的message的priority才起作用,官网建议的priority是从1到10。

  • 每一个consumer都有一个唯一标识,rabbitmq称其为consumer tag,通过consumer tag可以对consumer进行操作,比如取消某个consumer等。

  • 对于新添加的consumer,如果queue中有message,那么message将立即被送往consumer。

  • consumer通常在程序启动的时候创建,并且通常持续到程序结束。

  • 消息的push模式通过向queue注册一个consumer完成。

  • rabbitmq在投递消息给consumer时,会在message中加入以下property:

Property Type Description
Delivery tag Positive integer Delivery identifier, see Confirms.
Redelivered Boolean Set to true if this message was previously delivered and requeued
Exchange String Exchange which routed this message
Routing key String Routing key used by the publisher
Consumer tag String Consumer (subscription) identifier
  • publisher在发送message时可能加上以下message property,其中多数都是可选的:
Property Type Description Required?
Delivery mode Enum (1 or 2) 2 for "persistent", 1 for "transient". Some client libraries expose this property as a boolean or enum. Yes
Type String Application-specific message type, e.g. "orders.created" No
Headers Map (string => any) An arbitrary map of headers with string header names No
Content type String Content type, e.g. "application/json". Used by applications, not core RabbitMQ No
Content encoding String Content encoding, e.g. "gzip". Used by applications, not core RabbitMQ No
Message ID String Arbitrary message ID No
Correlation ID String Helps correlate requests with responses, see tutorial 6 No
Reply To String Carries response queue name, see tutorial 6 No
Expiration String Per-message TTL No
Timestamp Timestamp Application-provided timestamp No
User ID String User ID, validated if set No
App ID String Application name No
  • 在注册consumer到queue时,可以将该consumer设置为exclusive,表示该queue只能有一个consumer,如果queue中已经有其他consumer时,设置exclusive会失败。
  • 可以为consumer设置priority,priority高的consumer将优先获得message。
  • consumer需要自行处理消息投递过程中的所有异常,异常需要被日志记录下来,然后ignore掉。
  • 通常的client库中,都提供了thread pool用于异步的consumer操作。
  • queue和message都可以设置message的TTL。
  • 设置queue级别的messageTTL可以通过policy或者创建queue时指定message TTL参数x-message-ttl。
  • message级别的TTL可以在publish消息的时候通过expiration参数设置。
  • 当queue和message都设置有message的TTL时,较小的值生效。
  • 可以设置queue本身的TTL,通过policy或者声明queue时设置x-expires参数。
  • queue的最大长度可以通过最大消息数量或者最大字节数设置。
  • 通过声明时设置queue参数或者创建policy可以设置queue的长度,二者同时设置时较大者生效。只有ready message用于计算queue的长度,unacknowledged message不会参与计算。
  • 通过设置overflow参数可以控制queue达到最大长度时的行为,比如设置overflow=reject-publish时,会直接丢弃消息,此时如果publisher confirm启用,那么会通知publisher以nack消息,如果只有部分queue达到最大长度,那么同样会返回nack消息。
  • 可以在声明queue时设置overflow参数,主要有两种值:reject-publish和drop-head(默认)。
  • 在声明queue时,可以通过x-max-length参数指定queue的最大长度。
  • lazy queue即尽量将message放到磁盘中,只有当有consumer要消费时才从磁盘加载到内存中。lazy queue通常用于超长queue,比如有时consumer离线或者consumer处理速度远远小于进queue速度。
  • queue在持久化到磁盘时会阻塞,也即此时queue无法接收message,因此lazy queue会在很大程度上影响message接收效率。
  • 声明queue时,可以通过x-queue-mode参数指定是否为lazy queue,x-queue-mode有两个值,一个是default,一个是lazy,默认情况下为default。
  • 通过 channel.exchangeBind("destination", "source", "routingKey");可以设置exchange之间的绑定。
  • message在以下情况下可以被设置为放入死信exchange(DLX):1.消息被consumer拒绝接收(reject)并且requeue设置成了false;2.消息被consumer反面确认(nack)并且requeue设置成了false;3.由于TTL,消息过期;4.由于设置了queue的最大长度,消息被丢弃。
  • DLX只是普通的exchange,可以通过常规方式进行声明。
  • 可以通过policy或者声明queue时指定x-dead-letter-exchange参数来设置DLX,后者具有更高优先权。同时可以通过x-dead-letter-routing-key设置message放入DLX时的routing key,如果没有指定,那么message本身的routing key将被使用。
  • 客户端在nack或reject时可以指定requeue参数,表示该message是否应该重新放入queue中,或者丢弃/DLX掉。
  • nack和reject的区别,nack可以同时拒绝多条message,而reject只能拒绝一条消息。通常的做法是:将nack的消息重新放入队列(requeue),而将reject的消息直接丢弃或者DLX。
  • 声明queue时通过设置x-max-priority可以将queue设置为priority queue,然后在发送message时,可以通过basic.properties中的priority指定message的priority。
  • 在消费message时,可以通过指定 x-priority参数设置consumer的优先级,低优先级的consumer只有在高优先级的consumer被block后才能得到接收消息的机会。
  • 创建connection时,rabbitmq使用以下默认参数:
Property Default Value
Username "guest"
Password "guest"
Virtual host "/"
Hostname "localhost"
port 5672 for regular connections, 5671 for connections that use TLS
  • guest用户只能连接localhost。
  • channel和connection都是需要被close的,但是关闭connection会自动关闭channel。
  • 通过passive方式声明queue:channel.queueDeclarePassive();此时只有queue事先存在才成功,否则抛出channel级别异常,整个channel无法使用。
  • channe最好不要被多线程共享。
  • consumer可以通过delivery tag进行ack,某个channel创建的consumer只能通过该channel进行ack。
  • 被reject的message会被尽量放在先前的位置,如果不能则尽量放在里queue头部较近的位置。
  • 一直requeue将消耗很多资源,因此consumer需要自行记录requeue的次数,然后根据需求处理。
  • 当message被requeue时,message将多一个redelivered属性,如果再次消费失败,那么consumer可以根据该relelivered属性来决定应该继续requeue还是放入DLX。
  • 每次message被DLX,其中x-death中的count属性都会递增,可以通过该值来设置消息的最大retry次数。
  • publisher confirms 遵循以下原则:
    • an un-routable mandatory or immediate message is confirmed right after the basic.return;
    • otherwise, a transient message is confirmed the moment it is enqueued; and,
    • a persistent message is confirmed when it is persisted to disk or when it is consumed on every queue.
  • publisher confirm是异步的,confirm由connection的mainLoopThread线程完成,并不在publish线程完成。
  • Spring AMQP在publish的时候,如果connection无法创建,比如server不可达,那么rabbitmqtemplate将直接抛出异常。
  • In a cluster, nodes identify and contact each other using node names.
  • When a node starts up, it checks whether it has been assigned a node name. This is done via the RABBITMQ_NODENAME environment variable. If no value was explicitly configured, the node resolves its hostname and prepends rabbit to it to compute its node name.
  • For a CLI tool and a node to be able to communicate they must have the same shared secret called the Erlang cookie. The cookie is just a string of alphanumeric characters up to 255 characters in size. It is usually stored in a local file. The file must be only accessible to the owner (e.g. have UNIX permissions of 600 or similar). Every cluster node must have the same cookie.
  • Policies automatically match against exchanges and queues, and help determine how they behave. Each exchange or queue will have at most one policy matching。
  • Policies can be used to configure the federation plugin, mirrored queues, alternate exchanges, dead lettering, per-queue TTLs, and maximum queue length.
  • RabbitMQ is multi-tenant system: connections, exchanges, queues, bindings, user permissions, policies and some other things belong to virtual hosts, logical groups of entities.
  • The only concrete implementation we provide is CachingConnectionFactory, which, by default, establishes a single connection proxy that can be shared by the application. Sharing of the connection is possible since the “unit of work” for messaging with AMQP is actually a “channel”.
  • CachingConnectionFactory既能用于cache channel,也能用于cache connection factory。通过设置ConnectionFactory的cacheMode参数可以指定是cache channel还是factory。 each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache。
  • RabbitTemplate的retry机制主要用于与broker的连接异常。
  • 通过设置RabbitTemplate的usePublisherConnection可以将publish和consumer的connection分开。

相关文章

  • RabbitMQ 学习笔记:安装

    RabbitMQ 学习笔记系列 上一系列:MySQL 学习实践 RabbitMQ 学习笔记:安装 RabbitMQ...

  • RabbitMQ 问答式总结

    RabbitMQ的学习笔记 关于RabbitMQ的几个角色如下: 关于名词的通俗解析: 首先我们肯定知道Rabbi...

  • 统一配置

    spring cloud学习笔记 上文 学习使用Docker下文 RabbitMQ的基本使用 由于 spring ...

  • RabbitMQ学习笔记

    RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...

  • RabbitMQ 学习笔记

    RabbitMQ 是一个支持多种消息传递协议的消息代理, 支持 AMQP(一个具有强大路由功能的开放式连接协议),...

  • rabbitmq学习笔记

    在Window7下安装rabbitmq 下载 otp_win64_17.3.exe 和 rabbitmq-serv...

  • Rabbitmq学习笔记

    什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...

  • RabbitMQ学习笔记

    本文作者:陈进坚个人博客:https://jian1098.github.io[https://jian1098....

  • RabbitMQ学习笔记

    安装 追加内容export PATH=$PATH:/data/install/erlang/bin

  • RabbitMQ学习笔记

    RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQP(Advanved Mesaage Que...

网友评论

      本文标题:RabbitMQ学习笔记

      本文链接:https://www.haomeiwen.com/subject/hniauqtx.html