背景
目前笔者所在团队正常研发一款流程编排引擎,其中有多个功能特性需要MQ的延迟消息/消费者重试等特性。经过多方面的考量,我们最终决定采用计算存储分离的架构,在分布式KV存储的基础上,研发一款定制化的MQ。目前,其具备了MQ的主要特性。本文所描述的是基于分布式KV的基础上研发MQ的核心思路。
术语
- Message: 消息
- Topic: 消息的逻辑分类
- Partition: 分区,一个Topic中包含多个分区,每个消息最终发送到Topic的某个分区中
- Partition Offset: 每当消息被发送到某个Partition中,这个Partition的Offset+1
- Producer: 生产者,发送消息到某个Topic下的某个分区
- Consumer Group: 消费者组。一个Topic可以有多个不同的Consumer Group进行消费,每个Consumer Group可以消费到Topic中的全量消息。
- Consumer: 消费者,消费Topic中的消息。如果一个Consumer Group下只有一个Consumer,则其会消费到全部消息;如果有多个Consumer,则每个Consumer只消费部分消息。
- Consumer Group Offset: 记录消费者组,针对某个Topic下所有Partition当前消费到位置。针对每个Partition,不会超过Partition 最大Offset。
- Broker: 消息代理,Producer将消息发送给Broker,由broker发送到topic下某个分区。Consumer连接Broker,从Broker消费消息。
- Broker Cluster: Broker集群,为了实现高可用,每个Broker集群下包含多个Broke实例
- Rebalance: 再均衡。传统的实现中,一个Consumer Group下的Consumer数量不能超过这个Topic下Partition的数量,一个Partition最多只能分配给一个消费者,超出Partition数量的消费者无法消费到消息。在本文实现中,不存在Rebalance概念。消费者数量,不受Partition限制。
- Delay Message: 延迟消息。通常情况下,一个消息被投递到Topic中,就会被立即消费。延迟消息的意思是,延迟指定时间后,才可以被消费。
- Retry Message: 重试消息。当一条消息消费失败后,需要被重试,即按照一定的重试策略,重新让消费者来消费这条消息。
整体架构

说明:
KV 存储
本文以KV Storage是Redis Cluster为例进行讲解,Broker Cluster中的所有Broker实例共享这个Redis Cluster,其他任意支持按key排序scan的KV存储均可。
从持久化的角度来说,使用内存模式Redis并不合适,这里意图在于说明基于分布式KV实现的MQ的核心原理。事实上,在公司内部,我们使用的是基于RocksDB基础上研发的分布式KV,在网络通信上兼容redis协议。
为了简化,本文中不讨论Redis Cluster扩/缩容,Slot迁移的情况。但足以掌握基于分布式KV研发一款消息中间件的核心原理。
网络通信
Producer和Consumer与Broker的通信,在笔者的项目中,使用的是Grpc。在开源的通信框架中,Grpc可以说是最流行的方案,Apacha RocketMQ 5.x版本也采用了Grpc。在本文中,并不会对Grpc进行介绍。
详细设计
Broker集群元数据
每个Broker启动时,可以将自身信息注册到Redis中,以便producer/consumer进行服务发现。例如通过hash结构维护:
Key
[cluster]$cluster_name
复制代码
Value
filed value
broker1 ip:port
broker2 ip:port
复制代码
Topic元数据
Topic元数据主要是维护Topic下有多少Partition,这些Partition在Redis Cluster中是如何分布的。用户在创建Topic时,指定分区数量。
Redis Cluster有16384个槽,每个Redis分片负责其中部分槽。当创建一个Topic时,例如指定10个分区,可以按照一定策略把这个10个分区映射到不同的槽上,相当于间接的把分区分配到了不同的redis分片上。
当创建好一个Topic之后,将Topic下的分区分配给不同的Broker。例如10个分区,10个Broker,则每个Broker负责一个分区。如果只有5个分区,那么需要分配给其中5个broker。
例如通过hash结构维护维护这个映射关系
key
[topic_metadata]$topic_name
value
filed value
partition1 broker1
partition2 broker2
消息
消息使用protobuf进行定义:
message Message{
google.protobuf.Struct metadata = 1; //消息的元数据
string partition = 2; //消息所属的分区
int64 offset = 3; //消息的offset
string msgId = 4; //消息的唯一id
string topic = 5; //消息的topic
string key = 6; //消息key,用于路由
bytes body = 7; //消息体
google.protobuf.Timestamp born_time = 8; //消息生成时间
google.protobuf.Timestamp expireTime = 9; //消息截止时间,用于延迟消息
}
生产者在发送消息时,最简单的情况下,只需要指定消息的topic、body。当有其他特殊需求时,可以指定以下字段:
- key:具有相同key的消息,经过hash算法,写入到相同的分区。
- partition:直接指定分区,不根据key计算。
- expireTime:延迟消息。消息不希望被立即消费,而是到指定时间后,才会被消费。
消息发送
从Producer的角度来说:
- 重试: 发送一条消息到broker可能失败,所以需要重试。重试需要设置一定的次数和超时时间,在超时时间内进行重试。
- 分区选择: 选择分区应该在producer端确定,确定分区后,消息发送到分区所属的broker。
- 聚合: 为了减少网络io,应该聚合批次进行发送,注意聚合是按照分区进行聚合
- broker选择: 对于无序消息,选择broker可以有一定的策略,例如某个broker失败率比较高,或者延迟比较高,则应该优先选择其他的broker。
从broker角度来说:
接收到一条消息时,offset信息维护。每次发送,在确定消息需要发送到的分区后,broker需要将对应partition的offset+1。在笔者的项目中,使用了hash结构存储每个分区的最大的offset:
key:
[topic_offset]{$topic}
value
field value
partition 1 offset1
partition 2 offset2
为了提升offset维护的效率,不需要每次都调用HINCRBY,而是在broker启动时,将自己维护的分区offset信息加载到内存中,之后发送消息时,内存中增加,定期保存到KV中。
此外,需要有一个修正offset的逻辑,避免broker异常宕机的情况下,offset没有成功保存到redis中。在broker启动时,可以从当前维护的最大offset开始往后扫描,如果发现了新消息,则说明offset需要修正(参考如下消息存储部分)。
消息存储
当消息被写入到redis中,key满足以下格式:
[topic]{$topic_$partition}$offset
其中:
- [topic]:是固定前缀
- {topic_partition}:<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>t</mi><mi>o</mi><mi>p</mi><mi>i</mi><mi>c</mi><mtext>是</mtext><mi>t</mi><mi>o</mi><mi>p</mi><mi>i</mi><mi>c</mi><mtext>名称,</mtext></mrow><annotation encoding="application/x-tex">topic是topic名称,</annotation></semantics></math>topic是topic名称,partition表示分区。这里利用了redis hash tag能力。
- $offset:表示当前这个分区的offset
消息拉取
拉取通过redis scan操作进行,将scan到的消息交由消费者处理。
在拉取消息时,依赖于一个consumer offset,其维护了某个consumer group消费某个topic的进度信息。拉取时,从这个位置开始。这里可以考虑使用hash数据结构:
key:
[consumer_offset]$group_$topic
value
field value
partition 1 offset1
partition 2 offset2
当有消费者连接上某个broker时,broker查询到自己负责的分区的parititon offset,从这个位置开始拉取消息。
延迟消息
对于所有延迟消息,会首先发送到一个特殊的delay topic中,相当于暂存这个消息。消息到期后,投递到目标topic中。
- 在发送到延迟topic之前,会记录消息的原始topic、partition到metadata的
origin_topic
、origin_partition
字段中。之后发送到delay topic中。key格式与普通消息不同,以时间戳排序:
[delay]$broker_id}$expireTime
- 会有一个延迟消息转发器,不断的扫描abase,当发现有消息到期时,修改延迟消息中的目标topic为
origin_topic
、origin_partition
字段,之后从发送逻辑,投递到目标topic中。 - 同时,会记录当前扫描到的位置。
消费重试
当消费者消费一条消息失败时,默认也是会走延迟消息逻辑,到期后,投递给目标消费者,重新消费。重试消息,也是基于延迟消息的基础上开发的。
在逻辑上不同的是:
- 延迟消息直接投递给了目标topic
- 而重试消息不能投递给你目标topic,因为一个topic有多个consumer group,如果只是某个consumer group失败需要重试,那么其他consumer group应该不受影响。因此每个consumer_group,应该有独立的重试topic。如:
[topic]retry_$consumer_group
这个Topic下应该也有包含分区,策略与之前所述Topic元数据维护相同。
死信队列
当消息重试到最大重试次数后,依然失败,可以放入死信队列。如:
[topic]dead_$consumer_group
消息TTL
为了避免已经被消费的消息,占用大量的存储空间,消息会被清理。我们的策略是:
- 对于普通消息:3天后会自动清理,意味着一条消息3天内没被消费,将会被删除。
- 对于延迟消息:在截止时间的基础上+3天。
总结
本文的目的是介绍如何基于分布式KV研发一款MQ的核心思路,很多容灾/高可用/性能优化等方面的主题并没有讨论。仅仅是提供一种核心思路,如果希望在生产环境使用,需要进行大量的改进与优化。
作者:字节大力智能
链接:https://juejin.cn/post/7000561368633966623
来源:掘金
网友评论