美文网首页工作
RocketMQ 简介

RocketMQ 简介

作者: 左师兄zuosx | 来源:发表于2021-07-05 11:35 被阅读0次

    RocketMQ 前世今生

    RocketMQ在阿里内部叫做Metaq(最早名为Metamorphosis,中文意思变形记,是作家卡夫卡的中篇小说代表作,可见是为了致敬Kafka)。

    RocketMQ是Metaq3.0之后的开源版本。

    Metaq在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。并平稳支撑了历年双十一大促(万亿级的消息),在性能、稳定性、可靠性等方面表现出色,在整个阿里技术体系和大中台战略中发挥着举足轻重的作用。

    Metaq最终源于Kafka,早起借鉴了Kafka很多优秀的设计。但是由于Kafka是Scale语言编写而阿里系主要使用Java,且无法满足阿里的电商、金融业务场景,所以誓嘉(花名)团队用Java重新造轮子,并做了大量的改造和优化。

    在此之前,淘宝有一款消息中间件名为Notify,目前已经逐步被Metaq所取代。

    第一代的Notify主要使用了推模型,解决了事务消息;第二代的MetaQ主要使用了拉模型,解决了顺序消息和海量堆积的问题。相比起Kafka使用的Scale语言编写,RabbitMQ 使用Erlang语言编写,基于Java的RocketMQ开源后更容易被广泛的研究,以及其他大厂定制开发。

    image-20210706094411868.png

    RocketMQ 部署架构

    image-20210706094533660.png
    • NameServer

      管理Broker。如:各个邮局的管理机构

      NameServer 是一个无状态节点,可集群部署,节点之间无任何信息同步。

    • Broker

      暂存和传输消息。如:邮局

      Broker 部署相对复杂,Broker 分为 Master 和 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示为 Master ,非 0 表示 Slave 。Master 也可以部署多个。

      每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

      注意:当前 RocketMQ 版本在部署架构上支持一 Master 多 Slave,但只有 BrokerId = 1 的从服务器才会参与消息的读负载。

    • Topic

      区分消息的种类。一个发送者可以发送消息给一个或多个Topic。一个消息的接受者可以订阅一个或者多个Topic消息。

    • Message Queue

      相当于是Topic的分区。用于并行发送和接收消息。

    • Producer

      消息发送者。如:发信者

      Producer 完全无状态,可集群部署

      Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。

    • Consumer

      消息接收者。如:收信者

      Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。

      Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

    执行流程:

    1. 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

    2. Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer 集群中就有 Topic 跟 Broke r的映射关系。

    3. 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

    4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

    5. Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

    RocketMQ 特性

    • 订阅与发布

      消息的发布是指某个生产者向某个 Topic 发送消息,消息的订阅是指某个消费者关注了某个 Topic 中带有某些 Tag 的消息。

    • 消息顺序

      消息有序指的是消息消费时,能按照发送的顺序来消费。

      如:一个订单产生了三条消息,分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

    • 消息过滤

      RocketMQ 的消费者可以 根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的。

      优点:减少了对于 Consumer 无用消息的网络传输。

      缺点:增加了 Broker 的负担,而且实现相对复杂。

    • 消息可靠性

      RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

      1)Broker 非正常关闭。

      2)Broker 异常 Crash。

      3)OS Crash。

      4)机器掉电,但是能立即恢复供电情况。

      5)机器无法开机(可能是CPU、主板、内存等关键设备损坏)。

      6)磁盘设备损坏。

      1)、2)、3)、4)这四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)

      5)、6)属于点点故障,且无法恢复,一旦发生,在此单节点上的消息全部丢失。

      RocketMQ 在这两种情况下,通过异步复制,可保证99%的消息不丢失,但是任然会有极少量的消息可能丢失。

      通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。

      注:RocketMQ从3.0版本开始支持同步双写。

    • 至少一次

      至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费者一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

    • 回溯消息

      回溯消息是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息任然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费1小时前的数据,那么 Broker 需要提供一种机制,可以按照时间维度来回退消费进度。

      RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

    • 事务消息

      RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

      RocketMQ 的事务消息提供类型 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致性。

    • 定时消息

      定时消息(延迟队列)是指消息发送到 Broker 后,不会立即被消费,等待特定时间投递给真正的 Topic。

      Broker 有配置项 messageDelayLevel,默认值为 “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。

      messageDelayLevel是 Broker 的属性,不属于某个 Topic。发送消息是,设置 lelayLevel等级即可:msg.setDelayLevel(level)。level 有三种情况:

      • level == 0,消息为非延迟消息。

      • 1 <= level <= maxLevel,消息延迟特定时间,例如 level == 1,延迟1s

      • level > maxLevel ,则 level == maxLevel,例如 level == 20,延迟2h

      定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 Topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。Broker会调度地消费 SCHEDULE_TOPIC_XXXX ,将消息写入真实的 Topic。

      需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

    • 消息重试

      Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:

      • 消息本身的原因

        例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。

      • 依赖的下游应用服务不可用

        例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

    • 消息重投

      生产者在发送消息时:

      • 同步消息失败会重投

      • 异步消息由重试

      • oneway没有任何保证

      消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。

      如下方法可以设置消息重试策略:

      1. retryTimesWhenSendFailed

        同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢失。超过重投次数,抛异常,由客户端保证消息不丢失。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。

      2. retryTimesWhenSendAsyncFailed

        异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。

      3. retryAnotherBrokerWhenNotStoreOK

        消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

    • 流量控制

      生产者流控,因为 Broker 处理能力达到瓶颈。

      消费者流控,因为消费能力达到瓶颈。

      • 生产者流控

        1)commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为1000ms,发生流控。

        2)如果开启 transientStorePoolEnable = true,且Broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,发生流控。

        1. Broker每隔10ms检查send请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,发生流控。

        2. Broker通过拒绝 send 请求方式实现流量控制。

        注意:生产者流控,不会尝试消息重投。

      • 消费者流控

        1)消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。

        2)消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。

        3)消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。

        4)消费者流控的结果是降低拉取频率。

    • 死信队列

      死信队列用于处理无法被正常消费的消息。

      当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

      RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

      在 RocketMQ 中,可以通过使用 console 控制台对死信队列的消息进行重发来使得消费者实例再次进行消费

    RocketMQ 消费模式

    RocketMQ 消息订阅有两种模式,一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送;另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQ Server拉取。但在具体实现时,Push和Pull模式本质都是采用消费端主动拉取的方式,即 Consumer 轮询从 Broker 拉取消息。

    • Push 模式

    优点:就是实时性高。

    缺点:在于消费端的处理能力有限,当瞬间推送很多消息给消费端时,容易造成消费端的消息积压,严重时会压垮客户端。

    • Pull 模式

      优点:主动权掌握在消费端自己手中,根据自己的处理能力量力而行。

      缺点:如何控制 Pull 的频率,定时间隔太久影响时效性,间隔太短担心做太多“无用功”浪费资源。

      比较折中的办法就是长轮询。

    Push 与 Pull 区别:

    Push 方式里,Consumer 把长轮询的动作封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

    Pull 方式里,取消息的过程需要用户自己主动调用,首先通过打算消费的 Topic 拿到 MessageQueue 的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

    RocketMQ 使用长轮询机制来模拟 Push 效果,算是兼顾了二者的优点。

    RocketMQ 相关术语

    • 消息模型(Message Mode)

      RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。MessageQueue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

    • Producer

      消息生产者,负责产生消息,一般由业务系统负责产生消息。

    • Consumer

      消息消费者,负责消费消息,一般是后台系统负责异步消费。

    • PushConsumer

      Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端。应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。该消费模式一般实时性较高。

    • PullConsumer

      Consumer消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

    • ProducerGroup

      同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

    • ConsumerGroup

      同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费

      (Broadcasting)。

    • Broker

      消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为Provider。

    • 广播消费

      一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。

      在 CORBA Notification 规范中,消费方式都属于广播消费。

      在 JMS 规范中,相当于 JMS Topic( publish/subscribe )模型

    • 集群消费

      一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者3台机器),那举每个实例只消费其中的 3条消息。

    • 顺序消息

      消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

    • 普通顺序消息

      顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

    • 严格顺序消息

      顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)

      目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

    • Message Queue

      在 RocketMQ 中,所有消息队列都是持久化的,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,所以认为为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset 就是下标。

    • 标签(Tag)

      为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

    相关文章

      网友评论

        本文标题:RocketMQ 简介

        本文链接:https://www.haomeiwen.com/subject/qzqgultx.html