在MQ模型中,一般都会有Topic模型,Topic表示一类消息的集合。
在实际应用中,往往对一个Topic下的消息还会有不同的细分,消费方会根据细分的类型消费Topic中特定的一部分消息,这就涉及到了消息过滤。
比如对于交易的Topic,内部可能有下单消息、支付消息。其中支付系统只希望消费到交易Topic下的支付消息,面对这个需求,我们应该如何在自己的MQ中去满足呢?
image(图片引用自阿里云)
业界实现
RocketMQ
RocketMQ支持在发送消息的时候给消息增加Tag(Tag可以理解为sub-topic,即在Topic下再对消息类型进行区分)。
大多数场景下tag使用起来非常简单,如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
Consumer将收到Topic下Tag包含TAGA或TAGB或TAGC的消息。
对于Tag过滤的限制是一条消息只能有一个Tag,这在一些复杂场景下可能没办法满足需求。
RocketMQ提供另一种过滤方式:SQL92
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
仅消费消息属性包含a,且a的值在0-3之间的消息。
RocketMQ对消息过滤的支持比较完善了,通过SQL92这种方式可以满足各种复杂场景的需求了。
Kafka
Kafka目前并没有支持消息过滤,即没有在Topic下提供细分的类型来区分消息。
用户可以在Kafka Streams中实现过滤。
问题分析
大致了解消息过滤的定义和业界的支持情况之后,回头再思考一下,为什么MQ需要做消息过滤、MQ的过滤应该做到什么程度(用使者需要怎么样的过滤方式呢)?试着回答以下的一些问题来弄清楚需求,划清楚问题边界。
1. 为什么需要消息过滤?
业务方(MQ使用方)过滤数据的需求是天然存在的,比如Topic模型也是一种过滤,从众多的数据中订阅自己需要的一部分数据。
在上面这个前提下,逆向考虑这个问题:如果MQ不支持消息过滤(这里的过滤只Topic下的消息细分)但使用方又有过滤的需求,那么会出现什么情况?或者说业务方会怎么去解决这个问题?
可以猜想大致会出现以下两种情况:
-
细分Topic,即将Topic再拆分的细一些,把二级类型直接作为Topic
-
在Consumer的消费逻辑中根据消息的属性或者内容决定是否过滤消息
第一种情况在一些场景下实际上是无法做到的,比如本文开头的交易场景的例子。一旦对Topic进行了拆分,那么细分后的数据之间的消息顺序就无法保证了,但对于一个订单,它的下单、支付等操作显然是需要顺序被处理的。
对于第二种情况,这也是业务方唯一能做的事情了。当然,也是最灵活的过滤方式了,业务方可以根据自己的需求制定过滤策略。但是带来的问题是所有的消息都需要从服务端先取回到客户端,这里的带宽浪费是比较严重的(取了大量客户端不需要的数据)。
2. MQ对于过滤的需求需要支持到什么程度呢?
对于这个问题,我在思考的时候考虑的是以下几个点:
-
业务方的过滤需求有哪些类型,是否可以穷举
-
MQ的过滤功能能否覆盖掉用户的所有需求
-
以及支持消息过滤的成本
显然,用户的过滤需求难以穷举,且业务在不断的变化。但是通过像SQL这样的方式,我们可以认为覆盖了用户所有的过滤需求(就像查MySQL数据,可以组合各种SQL来完成目标数据的获取)。然而还需要去考虑成本的问题,比如机器成本、过滤对消息RT的影响等等。
所以在MQ的消息过滤中,我们期望能在成本和过滤能力之间找到一个平衡点,既能较好的支撑业务的过滤需求同时付出的成本在可接受范围内。
上面这句话的具体含义可以这样理解:
-
对消息的写入和消费的RT影响可以忽略
-
没有额外的资源需求(业务量不变的情况下,过滤功能不需要额外的机器资源投入)
-
覆盖业务的日常过滤需求(满足业务方90%以上的过滤需求)
站在巨人的肩膀上
在理解完需求且清除的知道我们要做什么之后,再来看一下业界“大佬”是怎么解决这个问题的。
RocketMQ Tag过滤
Message包含一个Tag属性,String类型,发送方可以进行设置,通常我们称为打标。
服务端在进行消息存储时,会将消息的Tag属性添加到消息索引中。Rocket的索引结构如下图:
image.png索引元素包含三项内容:
-
offset:消息在存储文件中的偏移量
-
size:消息在存储文件中的大小
-
tag hashcode:消息的Tag属性的HashCode值
为什么这里存的是Tag的哈希值而不是Tag本身的值呢?
索引本身是为了加快消息的查询速度,所以它的元素是定长的,这就决定了无法在索引中直接存储Tag的值。
因为索引中存储了Tag的哈希值,那么在进行消息读取时就可以根据用户的订阅请求进行消息匹配(可以在不读取存储文件的情况下完成消息的匹配,且开销可以不计)。
但是因为这里比较的是HashCode,所以在消息返回到Consumer之后需要再进行一次真实值得比较,以避免消费到非期望的数据。
那么增加了Tag之后,消息的读取流程如下:
-
获取用户读取消息的请求中期望的Tag的HashCode(可以是多个且进行||或者&&的运算)
-
读取索引元素,对比HashCode是否满足用户的过滤需求
-
从存储文件读取满足HashCode过滤条件的消息内容返回给Consumer
-
Consumer反序列化消息,对比Tag值进一步确认消息是否期望数据
RocketMQ SQL92过滤
image.png-
Broker通过Consumer的心跳,在ConsumerFilterManager组件中保存Consumer的过滤信息(Expression)
-
当Consumer尝试读取消息时,Broker构造MessageFilter来过滤需要的数据
RocketMQ SQL92过滤文档
Tag VS SQL92
|
| Tag过滤 | SQL过滤 |
| --- | --- | --- |
| 覆盖场景 | 支持简单过滤(消息单Tag,可以订阅多Tag或按逻辑运算订阅Tag) | 支持复杂过滤 |
| 实现成本 | 实现简单 | 实现复杂,涉及到SQL解析等 |
| 对服务端的影响 | 服务端只进行简单的long值比较,代价低 | 服务端需要复杂的计算,代价高 |
| 用户的使用成本 | 简单的Tag运算逻辑,对用户要求低 | 用户需要掌握一些SQL语法,相对来说复杂一些 |
结论:
-
SQL覆盖场景更多,满足用户所有需求
-
Tag覆盖场景少,但是无论从实现成本或者使用成本上都要小一些
所以在开发资源有限的情况下(比如没有足够的人手)要实现MQ中的过滤功能的话,Tag方式是一个更好的选择,SQL则作为不断完善的一个补充(没有也可以接受,有就最好了)。
“万恶”的业务方
“消息能不能支持多个Tag,这样发送的时候一条消息从不同的维度打上Tag来实现灵活的过滤需求”——from业务方
比如一条订单消息可以按照支付方式打标,也可以按照商品品类打表,这样订阅时可以灵活的过滤出目标数据。
message0.setTags(TagA);
message1.setTags(TagB);
message2.setTags(TagA,TagB,TagC);
consumer0.subscribe(topic, TagA&&TagB);==>message2
consumer1.subscribe(topic, TagA||TagC);==>message0, message2
一是用户提出了这样的需求,二是在支持Tag之后我们也会去考虑Tag的方式还有没有优化空间。
能不能支持一条消息有多个Tag?
消息多Tag的问题其实和索引中无法存储Tag原始值的问题是一致的,都是导致索引结构的变化:索引存Tag值或者存多个Tag的HashCode都会导致索引元素的长度不固定,进而无法快速定位消息。
此时最容易想到的方案就是扩展索引。
扩展实现多Tag
扩展索引的方式能保持消息索引依旧是定长的,把Tag相关的数据单独存储,只在有必要的时候读取Tag信息(用户有过滤需求时),如下图所示:
image.png-
索引定长的,那么读消息时依旧可以快速定位到消息
-
只要过滤的情况下读取Tag信息,对于读流程多了一次读Tag的操作
-
对于写流程,除了原本的写存储文件和写索引文件外,另外需要写一份Tag文件
这种方式实现多Tag需求是最直观的,缺陷也是最明显的:读写操作都多了一次Tag的操作。
更进一步,有没有办法在多Tag的情况下避免掉这一次Tag的读写操作呢?
不定长索引实现多Tag
既然不能独立出Tag的存储文件,那么只能直接扩展原来的索引文件了,直接将多个Tag的HashCode存到索引中。
面临的问题也非常清晰:不定长索引如何解决读取消息时索引的定位问题?
image因为每个索引元素的长度是不确定的,当用户需要读Msg2时,就无法通过2*element size来计算索引位置。只能遍历索引了吗?但是遍历显然又是无法接受的!
思考一下写消息的过程,我们是怎么确定消息在文件中的写入位置的呢?——追加到末尾。追加的过程其实是记录上一条消息写入后的位置,那么当前的消息就从之前的位置继续写。
其实消息读取的过程也是一样的,虽然索引是不定长的,但是只要知道了上一条索引的位置和大小,就能定位到下一条消息索引的位置了。
-
读第0条消息时,直接从索引的开头文件读取即可
-
读第1条消息时,只需要知道第0条消息的大小即可:0 + msg0 size
-
读第2条消息时,只需要知道第1条消息末尾的位置即可:msg1 offset + msg1 size
-
读第N条消息时,只需知道第N-1条消息的索引的位置和大小即可
那么,在读取第N条消息时其实只要知道第N-1条消息的索引位置就能快速定位出第N条消息的索引。
而在消息的场景中,99.999%的情况下读完第N条消息时,下一次都会读取第N+1条。
只有在少数异常的情况下需要修改offset的信息来读取之前或者之后的消息(而这种异常的场景下,可以通过一些优化的手段减少扫描的索引未见的数量来查找速度)。
不定长索引的寻址过程如下:
image此方案虽然解决了上一种扩展索引的方案带来的问题,但是并不能做到和RocketMQ通过SQL的方式支持灵活的过滤需求。
总结
本文从消息过滤的问题出发,介绍了RocketMQ的过滤功能实现,分析了消息过滤的需求,然后总结了不同的多Tag功能的实现方案。
对于消息过滤的实现,没有哪一种方案是完美的,我们应当从自身的场景出发,考虑现实面对的成本等问题,综合考虑,选择一种最适用于自身业务场景的方案。
欢迎关注我的公众号MessageQueue交流MQ相关内容。
image
网友评论