美文网首页Java 程序员Java
如何基于分布式KV研发一款消息中间件

如何基于分布式KV研发一款消息中间件

作者: 程序花生 | 来源:发表于2021-08-26 16:52 被阅读0次

背景

目前笔者所在团队正常研发一款流程编排引擎,其中有多个功能特性需要MQ的延迟消息/消费者重试等特性。经过多方面的考量,我们最终决定采用计算存储分离的架构,在分布式KV存储的基础上,研发一款定制化的MQ。目前,其具备了MQ的主要特性。本文所描述的是基于分布式KV的基础上研发MQ的核心思路。

术语

  • Message: 消息
  • Topic: 消息的逻辑分类
  • Partition: 分区,一个Topic中包含多个分区,每个消息最终发送到Topic的某个分区中
  • Partition Offset: 每当消息被发送到某个Partition中,这个Partition的Offset+1
  • Producer: 生产者,发送消息到某个Topic下的某个分区
  • Consumer Group: 消费者组。一个Topic可以有多个不同的Consumer Group进行消费,每个Consumer Group可以消费到Topic中的全量消息。
  • Consumer: 消费者,消费Topic中的消息。如果一个Consumer Group下只有一个Consumer,则其会消费到全部消息;如果有多个Consumer,则每个Consumer只消费部分消息。
  • Consumer Group Offset: 记录消费者组,针对某个Topic下所有Partition当前消费到位置。针对每个Partition,不会超过Partition 最大Offset。
  • Broker: 消息代理,Producer将消息发送给Broker,由broker发送到topic下某个分区。Consumer连接Broker,从Broker消费消息。
  • Broker Cluster: Broker集群,为了实现高可用,每个Broker集群下包含多个Broke实例
  • Rebalance: 再均衡。传统的实现中,一个Consumer Group下的Consumer数量不能超过这个Topic下Partition的数量,一个Partition最多只能分配给一个消费者,超出Partition数量的消费者无法消费到消息。在本文实现中,不存在Rebalance概念。消费者数量,不受Partition限制。
  • Delay Message: 延迟消息。通常情况下,一个消息被投递到Topic中,就会被立即消费。延迟消息的意思是,延迟指定时间后,才可以被消费。
  • Retry Message: 重试消息。当一条消息消费失败后,需要被重试,即按照一定的重试策略,重新让消费者来消费这条消息。

整体架构

说明:

KV 存储

本文以KV Storage是Redis Cluster为例进行讲解,Broker Cluster中的所有Broker实例共享这个Redis Cluster,其他任意支持按key排序scan的KV存储均可。

从持久化的角度来说,使用内存模式Redis并不合适,这里意图在于说明基于分布式KV实现的MQ的核心原理。事实上,在公司内部,我们使用的是基于RocksDB基础上研发的分布式KV,在网络通信上兼容redis协议。

为了简化,本文中不讨论Redis Cluster扩/缩容,Slot迁移的情况。但足以掌握基于分布式KV研发一款消息中间件的核心原理。

网络通信

Producer和Consumer与Broker的通信,在笔者的项目中,使用的是Grpc。在开源的通信框架中,Grpc可以说是最流行的方案,Apacha RocketMQ 5.x版本也采用了Grpc。在本文中,并不会对Grpc进行介绍。

详细设计

Broker集群元数据

每个Broker启动时,可以将自身信息注册到Redis中,以便producer/consumer进行服务发现。例如通过hash结构维护:

Key

[cluster]$cluster_name
复制代码

Value

filed         value

broker1       ip:port

broker2       ip:port
复制代码

Topic元数据

Topic元数据主要是维护Topic下有多少Partition,这些Partition在Redis Cluster中是如何分布的。用户在创建Topic时,指定分区数量。

Redis Cluster有16384个槽,每个Redis分片负责其中部分槽。当创建一个Topic时,例如指定10个分区,可以按照一定策略把这个10个分区映射到不同的槽上,相当于间接的把分区分配到了不同的redis分片上。

当创建好一个Topic之后,将Topic下的分区分配给不同的Broker。例如10个分区,10个Broker,则每个Broker负责一个分区。如果只有5个分区,那么需要分配给其中5个broker。

例如通过hash结构维护维护这个映射关系

key

[topic_metadata]$topic_name

value

filed         value

partition1    broker1

partition2    broker2

消息

消息使用protobuf进行定义:

message Message{

  google.protobuf.Struct metadata = 1;  //消息的元数据

  string partition = 2;  //消息所属的分区

  int64 offset = 3;      //消息的offset

  string msgId = 4;      //消息的唯一id

  string topic = 5;      //消息的topic

  string key = 6;        //消息key,用于路由

  bytes body = 7;        //消息体

  google.protobuf.Timestamp born_time = 8;  //消息生成时间 

  google.protobuf.Timestamp expireTime = 9; //消息截止时间,用于延迟消息

}

生产者在发送消息时,最简单的情况下,只需要指定消息的topic、body。当有其他特殊需求时,可以指定以下字段:

  • key:具有相同key的消息,经过hash算法,写入到相同的分区。
  • partition:直接指定分区,不根据key计算。
  • expireTime:延迟消息。消息不希望被立即消费,而是到指定时间后,才会被消费。

消息发送

从Producer的角度来说:

  • 重试: 发送一条消息到broker可能失败,所以需要重试。重试需要设置一定的次数和超时时间,在超时时间内进行重试。
  • 分区选择: 选择分区应该在producer端确定,确定分区后,消息发送到分区所属的broker。
  • 聚合: 为了减少网络io,应该聚合批次进行发送,注意聚合是按照分区进行聚合
  • broker选择: 对于无序消息,选择broker可以有一定的策略,例如某个broker失败率比较高,或者延迟比较高,则应该优先选择其他的broker。

从broker角度来说:

接收到一条消息时,offset信息维护。每次发送,在确定消息需要发送到的分区后,broker需要将对应partition的offset+1。在笔者的项目中,使用了hash结构存储每个分区的最大的offset:

key:

[topic_offset]{$topic}

value

field             value

partition 1       offset1

partition 2       offset2

为了提升offset维护的效率,不需要每次都调用HINCRBY,而是在broker启动时,将自己维护的分区offset信息加载到内存中,之后发送消息时,内存中增加,定期保存到KV中。

此外,需要有一个修正offset的逻辑,避免broker异常宕机的情况下,offset没有成功保存到redis中。在broker启动时,可以从当前维护的最大offset开始往后扫描,如果发现了新消息,则说明offset需要修正(参考如下消息存储部分)。

消息存储

当消息被写入到redis中,key满足以下格式:

[topic]{$topic_$partition}$offset

其中:

  • [topic]:是固定前缀
  • {topic_partition}:<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>t</mi><mi>o</mi><mi>p</mi><mi>i</mi><mi>c</mi><mtext>是</mtext><mi>t</mi><mi>o</mi><mi>p</mi><mi>i</mi><mi>c</mi><mtext>名称,</mtext></mrow><annotation encoding="application/x-tex">topic是topic名称,</annotation></semantics></math>topic是topic名称,partition表示分区。这里利用了redis hash tag能力。
  • $offset:表示当前这个分区的offset

消息拉取

拉取通过redis scan操作进行,将scan到的消息交由消费者处理。

在拉取消息时,依赖于一个consumer offset,其维护了某个consumer group消费某个topic的进度信息。拉取时,从这个位置开始。这里可以考虑使用hash数据结构:

key:

[consumer_offset]$group_$topic

value

field             value

partition 1       offset1

partition 2       offset2

当有消费者连接上某个broker时,broker查询到自己负责的分区的parititon offset,从这个位置开始拉取消息。

延迟消息

对于所有延迟消息,会首先发送到一个特殊的delay topic中,相当于暂存这个消息。消息到期后,投递到目标topic中。

  • 在发送到延迟topic之前,会记录消息的原始topic、partition到metadata的origin_topic origin_partition字段中。之后发送到delay topic中。key格式与普通消息不同,以时间戳排序:
[delay]$broker_id}$expireTime
  • 会有一个延迟消息转发器,不断的扫描abase,当发现有消息到期时,修改延迟消息中的目标topic为origin_topicorigin_partition字段,之后从发送逻辑,投递到目标topic中。
  • 同时,会记录当前扫描到的位置。

消费重试

当消费者消费一条消息失败时,默认也是会走延迟消息逻辑,到期后,投递给目标消费者,重新消费。重试消息,也是基于延迟消息的基础上开发的。

在逻辑上不同的是:

  • 延迟消息直接投递给了目标topic
  • 而重试消息不能投递给你目标topic,因为一个topic有多个consumer group,如果只是某个consumer group失败需要重试,那么其他consumer group应该不受影响。因此每个consumer_group,应该有独立的重试topic。如:
[topic]retry_$consumer_group

这个Topic下应该也有包含分区,策略与之前所述Topic元数据维护相同。

死信队列

当消息重试到最大重试次数后,依然失败,可以放入死信队列。如:

[topic]dead_$consumer_group

消息TTL

为了避免已经被消费的消息,占用大量的存储空间,消息会被清理。我们的策略是:

  • 对于普通消息:3天后会自动清理,意味着一条消息3天内没被消费,将会被删除。
  • 对于延迟消息:在截止时间的基础上+3天。

总结

本文的目的是介绍如何基于分布式KV研发一款MQ的核心思路,很多容灾/高可用/性能优化等方面的主题并没有讨论。仅仅是提供一种核心思路,如果希望在生产环境使用,需要进行大量的改进与优化。

作者:字节大力智能
链接:https://juejin.cn/post/7000561368633966623
来源:掘金

相关文章

网友评论

    本文标题:如何基于分布式KV研发一款消息中间件

    本文链接:https://www.haomeiwen.com/subject/kaeuiltx.html