RabbitMQ

作者: ands999 | 来源:发表于2019-05-02 17:05 被阅读0次
    相关概念

    消息包含消息体和标签。在消息路由的过程中,消息的标签会被丢弃,存入到队列中的消息只有消息体。
    RabbitMQ中的消息存储在队列中。Kafka将消息存储在topic(主题)逻辑层面,而队列逻辑只是topic实际存储文件中的位移标识。

    排他队列

    排他队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于连接可见的,同一连接的不同信道是可以访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个队列,其他连接是不允许创建同名的排他队列;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,排他队列适用于一个客户端同时发送和读取消息的应用场景。
    预先创建资源并配合mandatory参数或者备份交换器可以提高程序的健壮性。
    RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。
    当调用与Consumer相关的API方法时,不同的订阅采用不同的消费者标签(ConsumerTag)来区分彼此,在同一个Channel中的消费者也需要通过位移的消费者标签以作区分。
    设置autoAck为false,然后在接收到消息之后进行显式ack操作,这样可以防止不必要的消息丢失。
    和生产者一样,消费者同样要考虑线程安全的问题。消费者的callback会被分配到与Channel不同的线程池上,这意味着消费者可以安全的调用channel上的方法。
    Basic.Consume将信道(channel)置为投递模式,直到取消队列的订阅为止。在投递模式期间,rabbitMQ会不断的推送消息给消费者,推送的消息个数受basic.Qos的限制。如果只想从队列获取单条消息而不是持续订阅,建议使用basic.get进行消费。而不是将basic.get放在循环中代替basic.consume,这样会严重影响rabbitMQ性能。

    mandatory和immediate

    mandatory和immediate是channel.basicPublish方法中的两个参数,他们都是当消息传递过程中不可达目的地时将消息返回给生产者。
    当mandatory参数设为true时,交换器在无法根据自身的类型和路由键找到一个符合条件的队列时,MQ会调用basic.return命令将消息返回给生产者。当设置为false时,则丢弃消息。

    备份交换器

    备份交换器,英文名称为alternate exchange,简称AE。备份交换器可以将未被路由的消息存储在MQ中,在需要的时候再去处理这些消息。

    过期时间(TTL)

    TTL,time to live的简称,即过期时间。可以对消息和队列设置TTL。

    死信队列

    DLX(Dead-Letter-Exchange)死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就是死信队列。
    下面几种情况会导致消息变成死信:

    • 消息被拒绝(reject,nack),并且设置requeue参数为false。
    • 消息过期
    • 队列达到最大长度
    延迟队列

    DLX配合TTL可以实现延迟队列。

    优先级队列
    RPC实现

    RPC的处理流程:

    • 当客户端启动时,创建一个匿名的回调队列(名称由MQ自动创建)
    • 客户端iRPC请求设置2个属性:replyTo用来告知RPC服务端回复请求时的目的队列,即回调队列;corelationId用来标记一个请求。
    • 请求被发送到rpc_queue队列中。
    • RPC服务端监听rpc_queue队列中的请求,当请求到来时,服务端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设置的回调队列。
    • 客户端监听回调队列,当有消息时,检查corelationId属性,如果与请求匹配,那就处理。
    持久化
    生产者确认

    确保消息到达broker的两种方式:

    • 事务机制。
    • 发送方确认机制。

    只有消息成功被RabbitMQ接收,事务才能提交成功,否则可以在捕获异常之后进行事务回滚,于此同时可以进行消息重发。
    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会有一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Ack)给生产者(包含消息的唯一ID)。
    发送方确认机制的最大好处是其为异步的。当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack命令。
    事务机制和发送方确认机制是互斥的。
    事务机制和发送方确认机制确保的是消息能够正确发送至RabbitMQ的交换器,如果交换器没有匹配的队列,消息也会丢失。因此,还需配合mandatory参数或者备份交换器一起使用来提高消息传输的可靠性。
    批量confirm机制的问题在于遇到RabbitMQ服务端返回Nack后,需要重发批量消息而导致的性能降低。

    消息分发

    当RabbitMQ队列拥有多个消费者时,以轮询的分发方式发送给消费者。每条消息只会发送到订阅列表中的一个消费者。这种方式适合扩展,而且是专门为并发程序设计的。

    消息顺序性

    出现消息顺序混乱的情形:延迟队列、优先级队列、多个消费者共同消费
    解决乱序方法:在消息体内添加全局有序标识。

    消息传输保障

    RabbitMQ提供“最多一次”和“最少一次”两种保障级别。
    “最少一次”需要以下条件:

    • 消息生产者需要开启事务机制或者publisher confirm机制,以确保消息能到交换器。
    • 消息生产者要配合mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不被丢弃。
    • 消息和队列都需要进行持久化处理。
    • 消费者需要将autoAck设置为false。

    交换器其实是一个名称和绑定列表。当消息发布到交换器时,实际上是由信道将消息上的路由键同交换器的绑定列表进行比较,然后再路由消息。当创建一个新的交换器时,RabbitMQ就会将绑定列表添加到集群中的所有节点上。

    跨越集群的界限

    RabbitMQ可以通过3种方式实现分布式部署:集群、Fedetation和Shovel。这3种方式不是互斥的。可以互相配合。
    RabbitMQ集群对延迟非常敏感,应当只在本地局域网中使用。在广域网中应该使用Federation或者Shovel。
    Federation插件的目的是使RabbitMQ在不同的Broker节点之间进行消息传递而无须创建集群。

    RabbitMQ高阶

    存储机制

    持久层包含两个部分:队列索引(rabbit_queue_index,负责维护队列中落盘消息的信息)和消息存储(rabbit_msg_store,以键值对形式存储消息,所有队列共享)。rabbit_msg_store又可分为msg_store_persistent(负责持久化消息的持久化)和msg_store_transistent(负责非持久化消息的持久化)。
    最佳配备是较小的消息存储在rabbit_queue_index中,较大的消息存储在rabbit_msg_store中,此是由queue_index_embed_msgs_below来配置。rabbit_queue_index中的消息以顺序的段文件来进行存储,后缀为“.idx”。rabbit_msg_store中的消息是以追加的方式写入文件,后缀为“.rdq”。
    在存储消息时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。
    在读取消息的时候,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息。如果文件不存在或者被锁住,则发请求由rabbit_msg_store进行处理。
    执行合并的两个文件一定是逻辑上相邻的两个文件。执行合并时首先锁定这两个文件,并先对前面文件中的有效数据进行整理,再将后面文件的有效数据写入到前面的文件中,同时更新消息在ETS表中的记录,最后删除后面的文件。

    队列的结构

    通常队列由rabbit_amqqueue_process和backing_queue两部分组成,rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息,向消费者交付消息、处理消息的确认(包括生产者的confirm和消费者的ack)。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
    消息状态:alpha、beta、gamma、delta。
    消息的不同状态主要是因消息内容和消息索引在磁盘或者内存中的位置。
    普通的没有设置优先级和镜像的队列,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来体现消息的各个状态。
    当前内存中保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,会引起消息状态的转换。
    消费者获取消息也会引起消息的状态转换。

    惰性队列

    惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到对应的消息时,才会被加载到内存中,这是为了能够支持更长的队列。
    惰性队列和普通队列相比,只需要非常少的内存。因此,如果消息是持久化的,建议搭配惰性队列使用。

    内存及磁盘告警

    当内存使用超过阀值或者磁盘剩余空间低于阀值时,RabbitMQ都会暂时阻塞(block)客户端的连接(connection)并停止接受从客户端发来的消息,以此避免服务崩溃。
    如果一个broker节点的内存或磁盘受限,都会引起整个集群中所有的connection被阻塞。

    流控

    从2.8.0版本开始,rabbitmq引入流控(flow control)机制来保证稳定性。流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。内存和磁盘告警相当于全局的流控,一旦触发会阻塞集群中所有的connection,而此流控是针对单个connection。
    erlang进程之间不共享内存,而是通过消息传递来通信,每个进程都有自己的进程邮箱。默认情况下,erlang没有对进程邮箱的大小进行限制。
    rabbitmq使用基于信用证算法的流控机制来限制发送消息的速率以解决上面的问题。
    处于flow状态的connection和处于running状态的connection并没有什么不同,这个状态只是说明相应的发送速率已到达最高。
    流控机制可以作用于connection、channel和队列,持久化存储,从而形成一个完整的流控链。对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定会被全部被阻塞。
    提升队列的性能:

    • 开启erlang的HiPE功能
    • 以多个rabbit_amqqueue_process替换单个rabbit_amqqueue_process。这样可以充分利用rabbit_reader或者rabbit_channel进程中被流控的性能。
    镜像队列

    将消息设置为持久化,并且对于队列的durable属性设置为true,这样可以保证消息发送的可靠性。
    通过publisher confirm机制能保证消息缓存已落盘。
    对于rabbitmq集群,在单点故障时,交换器和绑定关系能够保证服务,但是队列以及其上的消息却不行,这是因为队列进程及消息仅维持在单个节点之上,所以一个节点的失效表现为其对应的队列不可用。
    引入镜像队列机制,可以将队列镜像到集群中的其他broker节点之上,如果集群中的一个节点失效了,队列能自动切换到镜像中的其他节点上以保证服务的可用性。
    镜像队列极大的提升了rabbitmq的可用性和可靠性,提供了数据冗余备份、避免单点故障,建议为每个重要的队列配置镜像队列。

    网络分区

    当出现网络分区时,不同分区里的节点会认为不属于自身所在分区的节点都已经挂了,对于队列、交换器、绑定的操作仅对当前分区有效。rabbitmq从3.1版本开始自动探测网络分区,并且提供相应的配置来解决此问题。
    rabbitmq引入网络分区的原因是与其数据一致性复制原理有关,rabbitmq采用的镜象队列是一个环形的逻辑结构。如果出现网络抖动或故障,那么这个逻辑环的性能会大大降低。所以引入网络分区将异常的节点剥离出整个分区,以确保rabbitmq服务的可用性以及可靠性。

    参考

    《RabbitMQ实战指南》

    相关文章

      网友评论

          本文标题:RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/dylywqtx.html