美文网首页
RabbitMQ消息队列

RabbitMQ消息队列

作者: 谭英智 | 来源:发表于2021-12-04 18:00 被阅读0次

消息队列提供消息的接收,消息的路由,和消息的发送功能。

它可以为大型系统解耦,提供异步处理能力和工作队列的能力

特性

围绕着消息队列的功能,rabbit mq提供以下特性

  • 可靠性或高性能

    用户可以选择是否持久化消息和队列来提高可靠性

    投递确认特性保证消息被正确的消费

    发布者证实机制保证消息发布的安全性

    通过镜像提供高可用,避免消息队列单点问题

  • 灵活的路由

    Rabbit MQ提供非常多样的路由选择,可以满足应用的大部分场景

  • 集群,高可用

    通过多个MQ服务器联合工作,并提供镜像功能保证集群的高可用和高性能

协议

Rabbit MQ使用AMQP协议来作为消息队列的协议

AMQP

简介

高级消息队列协议,是一种网络协议

它包括发布者,消费者和消息代理

mq-amqp-simple

发布者向消息队列投递消息

消息的某些属性会被消息代理利用,作为路由的特性,而其他属性会被直接透传给消费者

由于网络是不可靠的,消费者有可能处理失败,AMQP提供一个消息确认的机制,只有消息被确认了,才会从消息队列删除。消息确认可以是自动或者由消费者发送回执确认

某些情况下,消息有可能无法被成功的路由,消息或许返回给发布者并被丢弃,或者如果消息代理执行了延期操作,消息会被放入一个死信队列中

消息路由规则

  • Direct exchange(直连交换机) : (Empty string) and amq.direct
mq-direct

用于单播

发布者的消息携带路由键R,会被发送到所有绑定了路由键R的消费队列

可以实现相同路由键R的消费者的负载均衡

  • Fanout exchange(扇型交换机) : amq.fanout
mq-fanout

用于广播

  • Topic exchange(主题交换机) : amq.topic

用于多播,通过路由键和队列来多播

  • Headers exchange(头交换机) : amq.match (and amq.headers in RabbitMQ)

它是直连交换的另一种模式

不同之处是直连交换只通过路由键来路由

而头交换则通过更多自定义的头属性,来路由消息

队列

  • Name:名字
  • Durability:消息代理重启后,交换机是否还存在,但消息体不会被持久化
  • Exclusive:只能被一个连接使用,连接关闭后,队列自动删除
  • Auto-delete:当最后一个消费者退订后,队列被删除

队列需要被声明(declare)后才能被使用

队列可以被多次声明,但如果声明的参数不一致,会返回错误

消费者

消费有两种模式:

  • push API,消息订阅,被动消费
  • pull API,根据需要,主动消费
拒绝消息

当消费者消费某条消息失败后,可以发送拒绝消息给消息代理

消息代理或许把消息销毁或者再次投入消息队列

预取消息

多个消费者共享一个队列的案例中,可以配置在收到一个消息确认后,每个消费者可以接收多少条消息

它可以提高系统的吞吐量

消息

  • content type:内容类型
  • Content encoding:内容编码
  • Routing key:路由键
  • Delivery mode:是否持久化
  • Message priority:消息优先级
  • Message publishing timestamp:消息发布时间错
  • Expiration period:消息有效期
  • Publisher application id:发布应用的ID

连接

通过TCP SSL来保护连接

通道

每个线程都开一个连接是不高效的

可以为整个应用开一个channel

各个线程通过共享这个channel的连接,来提升连接的利用率

脑裂问题

当部署了两个节点的rabbitmq的时候,如果这两个节点的网络不通或者不稳定的时候,两个节点会自动选举自己为master,并且各自工作。

等网络重新变好时,两个节点不会重新成为集群,并造成消息消费异常,需要手工解决

自动解决的方法:

配置 cluster_partition_handling为以下的一种模式

  • ignore

    当出现脑裂时,需要人工恢复

  • pause_minority

    启动奇数个mq实例,如果部分网络不可用时,少数派(小于半数的节点数)的实例会自动关闭,多数派会继续正常工作。并可以自动恢复集群

  • pause_if_all_down

    语法为{pause_if_all_down, [nodes], ignore|autoheal}

    当某个节点检测其他所有节点都不可用时,自动关闭自身。并通过ignore或者autoheal来恢复集群

    这适用于偶数个节点的情况

    例如两个机房分别部署了两个mq实例,但机房不通时,各机房的mq依然可以继续工作,适用于4个mq实例的情况

  • autoheal:当出现脑裂的时候,Rabbitmq会选取一个获胜的分区作为master。获胜的评判规则是:连接较多的一个、顺序优先

丢数据问题

  • rabbitmq收到消息,暂存在内存中,还没消费,就挂了,造成消息丢失
  • 消费者消费了消息,但还没来得及处理,就挂了,rabbitmq以为消费者已经处理,但实际没有被处理,造成消息丢失

解决方法

  • 发布者

    开启事务,如果消息没有被mq接收,则回滚,并重新发送消息(同步,性能低)

    适用confirm机制(异步),确定消息是否正确收到,mq可以异步通知,但会造成消息乱序的问题

  • 对于MQ

    • 消息持久化

      exchange持久化

      queue持久化

      message持久化

    • 开启集群镜像

      HA模式,可以选择同步给所有节点,同步最多N个节点和之同步给符合条件的节点

      缺点是吞吐量会下降,当可用性会增大

    • 消息补偿机制

      可以通过数据库来做备用存储,来保证消息的正确性和可用性

  • 消费者

    ACK确认机制

引用:

http://rabbitmq.mr-ping.com/

相关文章

网友评论

      本文标题:RabbitMQ消息队列

      本文链接:https://www.haomeiwen.com/subject/jpqbxrtx.html