美文网首页
RabbitMQ 术语与概念(同其他MQ区别所在)

RabbitMQ 术语与概念(同其他MQ区别所在)

作者: 严重思想跑偏患者 | 来源:发表于2019-04-24 10:04 被阅读0次

    RabbitMQ术语与概念

    之前发现RocketMQ不支持我们项目的需求,他的ACL是个半开发完状态的功能,故更换为RabbitMQ

    TIPS:从ActiveMQ用到RocketMQ再到RabbitMQ。。。。。

    老规矩,有了之前的概念,就不在说明为什么要有MQ这门技术了,直接看里面的术语:

    TIPS:我用过的三种MQ使用三种不同的协议,服了都,就不能来个统一的协议吗

    RabbitMQ通信概念图

    1. Borker

    消息队列代理服务器实体。与RocketMQ一个含义
    

    2. Channel(信道)

    多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
    

    功能:定义Queue、定义Exchange、绑定Queue与Exchange、发布消息etc.

    3. Message

    消息由消息头和消息体组成。
    

    消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)etc。

    4. Routing Key(路由键)255B

    消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。
    

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。

    5. Binding(绑定)

    用于建立Exchange和Queue之间的关联。
    

    一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。

    6. Binding Key(绑定键)255B

    Exchange与Queue的绑定关系,用于匹配Routing Key。
    

    7. Queue(消息队列)

    内部对象,存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。
    

    它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。
    消息一直在队列里面,等待消费者连接到这个队列将消息取走。
    需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收

    8. Exchange(交换器|路由器)

    提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。
    它指定消息按什么规则,路由到哪个队列。
    

    交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    交换器有四种消息调度策略:

    TIPS: 调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)。

    1. Fanout(订阅模式|广播模式)
      Fanout

    交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。
    订阅模式与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。
    Fanout交换机转发消息是最快的。

    1. Direct(路由模式)(默认)
      Direct

    精确匹配:只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。
    RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue
    (每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。
    所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。

    1. Topic (通配符模式)

      Topic
      topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定
      • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
      • binding key与routing key一样也是句点号“. ”分隔的字符串
      • binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
    1. Headers(键值对模式)

      Headers
      根据发送的消息内容中的headers属性进行匹配。
      Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,
      交换器的路由规则是通过消息头的Headers属性来进行匹配转发的,类似HTTP请求的Headers。
      • 在绑定Queue与Exchange时指定一组键值对
      • 键值对的Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或all,代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)。
      • 当消息发送到Exchange时,交换器会取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
      • Headers交换机的优势是匹配的规则不被限定为字符串(String),而是Object类型。
      • Note that headers beginning with the string x- will not be used to evaluate matches.

    9. producer|sender 的 ACK 和 RPC

    RabbitMQ中的RPC
    所有的MQ本质上都是基于异步的消息处理,这就是MQ技术的本质,致力于效率与系统间的松耦合。
    首先ActiveMQ消息发送:
    1. 同步发送:producer同步发送至Borker,阻塞等待响应。
    2. 异步发送:可以不设置回调函数。。。

    回忆一下RocketMQ的消息,RocketMQ的消息发送方式有三种:(可设置重试次数,和等待时间)

    1. 同步发送:同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。(注意:响应是Borker的,不是消费者消费的那个响应)
    2. 异步发送:就是不等响应,(注意不等响应,不是没有响应,有回调接口定义的。)
    3. 单向发送:就是一种单方向通信方式,不等待回应,也没有回调。
      也就是说RocketMQ的producer没有根据消费者消费情况来控制消息发送的。

    然后就是现在的RabbitMQ:
    先大概猜一下,等待后续理解修正:
    只有一种发送方式:异步发送:可以指定回调确认机制,发送确认分为两步:(需要配合开启)

    1. 确认是否到达交换器 : 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
    2. 确认是否到达队列:通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)

    官网描述
    RabbitMQ的消息确认机制
    RabbitMQ(四)消息确认(发送确认*)
    RabbitMQ(五)消息发送失败后的处理
    另一种模式:RabbitMQ的RPC,机制:其实这就跟RocketMQ的异步发送差不多,只不过RocketMQ是Broker响应,RabbitMQ是Consumer响应。

    1. 生产者发送消息时,在消息的属性(MessageProperties,在AMQP协议中定义了14个属性,这些属性会随着消息一起发送)中设置两个属性值replyTo(一个Queue名称,用于告诉消费者处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,消费者处理完成后需要将此属性返还,生产者将根据这个id了解哪条请求被成功执行了或执行失败)。
    2. 消费者收到消息并处理。
    3. 消费者处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。
    4. 生产者之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。

    10. consumer|receiver 的 ACK

    Message acknowledgment  消息回执
    

    大体与其他MQ基本类似:ACK都应该分两部分,一部分是Producer与Borker之间的,另一部分是Borker和Consumer之间的。主要是Borker与Consumer之间的。
    所有的MQ都是认为:对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了。
    ActiveMQ 的ACK:
    协议模型:STMOP

    • ACK模式ACK_MODE分为五种:
      • AUTO_ACKNOWLEDGE = 1 自动确认
      • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
      • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
      • SESSION_TRANSACTED = 0 事务提交并确认
      • INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
    • ACK_TYPE(Client与broker交换的ACK指令)
    DELIVERED_ACK_TYPE = 0    消息"已接收",但尚未处理结束
    STANDARD_ACK_TYPE = 2    "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
    POSION_ACK_TYPE = 1    消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
    REDELIVERED_ACK_TYPE = 3    消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
    INDIVIDUAL_ACK_TYPE = 4    表示只确认"单条消息",无论在任何ACK_MODE下    
    UNMATCHED_ACK_TYPE = 5    BROKER间转发消息时,接收端"拒绝"消息
    
    • 创建Session时指定ACK模式,ACK模式将是session共享的,意味着一个session下所有的 consumer都使用同一种ACK模式。
    • Client端指定了ACK模式,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。
    • 具体ACK策略需要了解不同的ACK_MODE不同的消费模式来区分,我只用过CLIENT_ACKNOWLEDGE的模式。

    RocketMQ 的ACK:
    协议模型:自定义? 我记忆中没见过RocketMQ使用的是什么协议通讯模型。
    只在PushConsumer中有实现的(集群模式才有消息自动重试机制)

    • 只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
    • 指定重试次数RocketMQ会把这批消息重发回Broker(topic不是原topic而是'%RETRY%topic',代码层次不需理会),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
    • 一批次的批量消息中间失败会保留ackIndex值。context对象的成员变量ackIndex的含义是:最后一个正常消费的消息索引号。
    • 重试一定次数后(默认16),还失败就会放到死信队列中,人工干预。


      RocketMQ 的ACK失败重发队列名

    RabbitMQ 的ACK:
    协议模型:AMQP

    • ACK确认模式 (String basicConsume(String queue, boolean autoAck, Consumer callback))
    • autoAck当然意思就是值为true就是自动应答。表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
    • autoAck设置为false手动应答,
      • 成功消费调用:void basicAck(long deliveryTag, boolean multiple) 来告诉消息服务器来删除消息。
      • 消费失败: void basicReject(long deliveryTag, boolean requeue) requeue true 重新放回队列头;false 放弃
      • 批量取消:void basicNack(long deliveryTag, boolean multiple, boolean requeue)

    11. Durability

    实际上就是要求MQ服务器重启不会造成数据的丢失。  
    

    好吧,还是三个都说:
    AMQ的持久化:

    • AMQ默认就是持久化方式,如果想要非持久化消息,则producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    • AMQ持久化方式:JDBC、AMQ、KahaDB、LevelDB。
    • 现在默认KahaDB,比AMQ更小更快。基于日志文件,索引和缓存。
    • 据说LevelDB更快,常与Zookeeper整合使用。但是我在官网看见写着已经被弃用了。。。。。

    RocketMQ的持久化:

    • RocketMQ的消息都是持久化的,使用文件log方式,利用Linux文件系统内存cache来提高性能。
    • 重点在于集群部署 单master,多master,多master多slaver
    • 集群复制:异步复制、同步双写
    • 刷盘方式:同步刷盘、异步刷盘:就是先ACK还是先写log的问题。
    • commitlog就是来存储所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。
    • consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据。

    RabbitMQ的持久化:

    • Rabbit MQ默认是不持久化。
    • 持久化操作:
      • 将交换器/队列的durable属性设置为true,这让queue和exchange都是持久化的。
      • 在消息发布前,设置delivery mode 为 2 ,这一步才会让消息也持久化。
    • 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
    • 这种持久化策略依然有小概率事件丢失数据,(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务(下面再讲)。

    RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应(如果消息路由到了非持久队列,它会自动从持久化日志中移除)。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重播持久化日志文件中的消息到合适的队列或者交换器上。

    12. 消息分发模式

    这里都简单介绍一下,这是主要机制区别所在,说起来比较麻烦。
    

    AMQ:
    非常复杂难以描述。推荐一个博客
    ActiveMQ 消息消费模式

    示例图
    图与博客配合学习效果更好哦?
    • messageGroup概念:对消息分组,相同JMSXGroupID的消息会被分发到相同的consumer。负载均衡的一种高效机制。(queue)缓存命中这种。
    • topic的负载均衡:VirtualTopic
    • 筛选数据方式:MessageConsumer createConsumer( Destination destination, String messageSelector, boolean noLocal )语法是SQL-92的子集。

    RocketMQ:

    • 消费模式:
      • 集群模式:一个Consumer Group中的各个Consumer实例分摊去消费消息。
        • 实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
        • 这种模式下,消费进度的存储会持久化到Broker。
      • 广播消费:消息将对一个Consumer Group下的各个Consumer实例都投递一遍。
        • 实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
        • 这种模式下,消费进度会存储持久化到实例本地。
    • 消息过滤限制:Topic、Tag、SQL message指定topic,可以附带tag和property
    • 一个Consumer Group指定topic,支持一种消息过滤模式,比如指定接收tag为“*”,或通过SQL92来筛选property。
    • consumer支持pull和push两种,只有push支持SQL92。

    RabbitMQ:

    • RabbitMQ中没有各种group的概念,
    • 根据Exchange的消息调度策略来匹配消息。
    • 它的一个queue就是一组数据,它的负载均衡策略:(prefetchCount来限制Queue每次发送给每个消费者的消息数)
      • 轮询分发(Round-robin):队列给每一个消费者发送数量一样的数据。
      • 公平分发(Fair dispatch):消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。channel.basicQos(prefetchCount);
    • 虽然据说也是支持pull和push两种方式,但我看到所有的都是String basicConsume(String queue, boolean autoAck, Consumer callback)。

    13. 用户 vhost

    关于用户权限:
    

    AMQ:

    • 支持用户名密码认证:activemq配置文件中plugin定义用户、组信息,其中groups信息用于验证授权,通过jaas进行用户的授权。
    • 用户名密码的方式比较简单,网上一搜一大堆。
    • 关于自定义权限认证:采用plugin方式扩展方法,可自定义权限认证。
      ActiveMQ-自定义用户验证

    RocketMQ:
    现在最新版本4.4,GitHub上的开发版本4.5,在GitHub和官网中并没有找到任何关于权限用户之类的产品特色。

    这里有个误区,找RocketMQ的文档时,在wiki上发现介绍说拥有 “访问权限控制”的功能,实际上在4.4版本中,它的ACL模块只定义了接口,并没有具体实现。
    我试过把GitHub上最新版本4.5关于ACL的代码clone下来,结果表明现在框架有bug,根本没实现完。
    还有在网上看到阿里云的RocketMQ写着4.4版本支持权限认证之类的, 那是阿里云的产品,不是这个Apache的开源框架。

    RabbitMQ:
    RabbitMQ通过用户名密码和vhost控制
    用户:

    • 管理:命令就不说了,页面登录一看就明白,默认超级管理员guest,只能localhost。
    • 用户角色:
      • 其他(none):
      • Admin:可登陆console查(Users、Vhost、policies、Limits、Cluster)。
      • 监控员(Monitoring):可登陆console查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
      • 策略制定者(Policymaker):对policy进行管理。?现在还没搞清楚这是什么东西
      • 普通管理者(Management):所拥有权限的vhost的链接消息统计信息。与队列之类的实际消息信息。
      • 模仿者(Impersonator):这是新版出现的新角色。不知道什么?都没看到有文档说明
      • 其他:无法登陆console,就是普通的生产者和消费者。
    • 一般情况下,最好让管理员和AMQP链接账号区分开

    vhost:RabbitMQ自有概念:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    • vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、绑定、交换器和权限控制;
    • vhost通过在各个实例间提供逻辑上分离,允许你为不同应用程序安全保密地运行数据;
    • vhost是AMQP概念的基础,必须在连接时进行指定,RabbitMQ包含了默认vhost:“/”;
    • 当在RabbitMQ中创建一个用户时,用户通常会被指派给至少一个vhost,并且只能访问被指派vhost内的队列、交换器和绑定,vhost之间是绝对隔离的。
    • 当我们在创建用户时,会指定用户能访问一个虚拟机,并且该用户只能访问该虚拟机下的队列和交换机,如果没有指定,默认的是”/”;一个rabbitmq服务器上可以运行多个vhost,以便于适用不同的业务需要,这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同vhost之间是隔离的。

    rabbitmq的权限控制通过两层来实现,一是vhost的权限,二是确认有权限访问vhost后,对vhost内资源的权限控制(配置,读,写)

    用户权限指的是用户对exchange,queue的操作权限,包括配置权限,读写权限。
    配置权限会影响到exchange,queue的声明和删除。
    读写权限影响到从queue里取消息,向exchange发送消息以及queue和exchange的绑定(bind)操作。

    “---” 双引号中的信息为正则表达式
    ".*" 表示配置任何队列和交换机
    "checks-.*"表示只能配置名字以"checks-"开头的队列和交换机
    " " 不匹配队列和交换机
    
    • 目前没找到有什么自定义权限认证的方式。

    14. 事务

    TTL

    QLL

    DLE

    PQ

    相关文章

      网友评论

          本文标题:RabbitMQ 术语与概念(同其他MQ区别所在)

          本文链接:https://www.haomeiwen.com/subject/wgjwgqtx.html