消息过滤

作者: BlackManba_24 | 来源:发表于2018-05-21 19:22 被阅读5次

    在MQ模型中,一般都会有Topic模型,Topic表示一类消息的集合。

    在实际应用中,往往对一个Topic下的消息还会有不同的细分,消费方会根据细分的类型消费Topic中特定的一部分消息,这就涉及到了消息过滤。

    比如对于交易的Topic,内部可能有下单消息、支付消息。其中支付系统只希望消费到交易Topic下的支付消息,面对这个需求,我们应该如何在自己的MQ中去满足呢?

    image

    (图片引用自阿里云)

    业界实现

    RocketMQ

    RocketMQ支持在发送消息的时候给消息增加Tag(Tag可以理解为sub-topic,即在Topic下再对消息类型进行区分)。

    大多数场景下tag使用起来非常简单,如:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
    

    Consumer将收到Topic下Tag包含TAGA或TAGB或TAGC的消息。

    对于Tag过滤的限制是一条消息只能有一个Tag,这在一些复杂场景下可能没办法满足需求。

    RocketMQ提供另一种过滤方式:SQL92

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    
    // only subsribe messages have property a, also a >=0 and a <= 3
    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    

    仅消费消息属性包含a,且a的值在0-3之间的消息。

    RocketMQ对消息过滤的支持比较完善了,通过SQL92这种方式可以满足各种复杂场景的需求了。

    Kafka

    Kafka目前并没有支持消息过滤,即没有在Topic下提供细分的类型来区分消息。

    用户可以在Kafka Streams中实现过滤。

    问题分析

    大致了解消息过滤的定义和业界的支持情况之后,回头再思考一下,为什么MQ需要做消息过滤、MQ的过滤应该做到什么程度(用使者需要怎么样的过滤方式呢)?试着回答以下的一些问题来弄清楚需求,划清楚问题边界。

    1. 为什么需要消息过滤?

    业务方(MQ使用方)过滤数据的需求是天然存在的,比如Topic模型也是一种过滤,从众多的数据中订阅自己需要的一部分数据。

    在上面这个前提下,逆向考虑这个问题:如果MQ不支持消息过滤(这里的过滤只Topic下的消息细分)但使用方又有过滤的需求,那么会出现什么情况?或者说业务方会怎么去解决这个问题?

    可以猜想大致会出现以下两种情况:

    1. 细分Topic,即将Topic再拆分的细一些,把二级类型直接作为Topic

    2. 在Consumer的消费逻辑中根据消息的属性或者内容决定是否过滤消息

    第一种情况在一些场景下实际上是无法做到的,比如本文开头的交易场景的例子。一旦对Topic进行了拆分,那么细分后的数据之间的消息顺序就无法保证了,但对于一个订单,它的下单、支付等操作显然是需要顺序被处理的。

    对于第二种情况,这也是业务方唯一能做的事情了。当然,也是最灵活的过滤方式了,业务方可以根据自己的需求制定过滤策略。但是带来的问题是所有的消息都需要从服务端先取回到客户端,这里的带宽浪费是比较严重的(取了大量客户端不需要的数据)。

    2. MQ对于过滤的需求需要支持到什么程度呢?

    对于这个问题,我在思考的时候考虑的是以下几个点:

    1. 业务方的过滤需求有哪些类型,是否可以穷举

    2. MQ的过滤功能能否覆盖掉用户的所有需求

    3. 以及支持消息过滤的成本

    显然,用户的过滤需求难以穷举,且业务在不断的变化。但是通过像SQL这样的方式,我们可以认为覆盖了用户所有的过滤需求(就像查MySQL数据,可以组合各种SQL来完成目标数据的获取)。然而还需要去考虑成本的问题,比如机器成本、过滤对消息RT的影响等等。

    所以在MQ的消息过滤中,我们期望能在成本和过滤能力之间找到一个平衡点,既能较好的支撑业务的过滤需求同时付出的成本在可接受范围内。

    上面这句话的具体含义可以这样理解:

    1. 对消息的写入和消费的RT影响可以忽略

    2. 没有额外的资源需求(业务量不变的情况下,过滤功能不需要额外的机器资源投入)

    3. 覆盖业务的日常过滤需求(满足业务方90%以上的过滤需求)

    站在巨人的肩膀上

    在理解完需求且清除的知道我们要做什么之后,再来看一下业界“大佬”是怎么解决这个问题的。

    RocketMQ Tag过滤

    Message包含一个Tag属性,String类型,发送方可以进行设置,通常我们称为打标。

    服务端在进行消息存储时,会将消息的Tag属性添加到消息索引中。Rocket的索引结构如下图:

    image.png

    索引元素包含三项内容:

    • offset:消息在存储文件中的偏移量

    • size:消息在存储文件中的大小

    • tag hashcode:消息的Tag属性的HashCode值

    为什么这里存的是Tag的哈希值而不是Tag本身的值呢?

    索引本身是为了加快消息的查询速度,所以它的元素是定长的,这就决定了无法在索引中直接存储Tag的值。

    因为索引中存储了Tag的哈希值,那么在进行消息读取时就可以根据用户的订阅请求进行消息匹配(可以在不读取存储文件的情况下完成消息的匹配,且开销可以不计)。

    但是因为这里比较的是HashCode,所以在消息返回到Consumer之后需要再进行一次真实值得比较,以避免消费到非期望的数据。

    那么增加了Tag之后,消息的读取流程如下:

    1. 获取用户读取消息的请求中期望的Tag的HashCode(可以是多个且进行||或者&&的运算)

    2. 读取索引元素,对比HashCode是否满足用户的过滤需求

    3. 从存储文件读取满足HashCode过滤条件的消息内容返回给Consumer

    4. Consumer反序列化消息,对比Tag值进一步确认消息是否期望数据

    RocketMQ SQL92过滤

    image.png
    1. Broker通过Consumer的心跳,在ConsumerFilterManager组件中保存Consumer的过滤信息(Expression)

    2. 当Consumer尝试读取消息时,Broker构造MessageFilter来过滤需要的数据

    RocketMQ SQL92过滤文档

    Tag VS SQL92

    |
    | Tag过滤 | SQL过滤 |
    | --- | --- | --- |
    | 覆盖场景 | 支持简单过滤(消息单Tag,可以订阅多Tag或按逻辑运算订阅Tag) | 支持复杂过滤 |
    | 实现成本 | 实现简单 | 实现复杂,涉及到SQL解析等 |
    | 对服务端的影响 | 服务端只进行简单的long值比较,代价低 | 服务端需要复杂的计算,代价高 |
    | 用户的使用成本 | 简单的Tag运算逻辑,对用户要求低 | 用户需要掌握一些SQL语法,相对来说复杂一些 |

    结论:

    • SQL覆盖场景更多,满足用户所有需求

    • Tag覆盖场景少,但是无论从实现成本或者使用成本上都要小一些

    所以在开发资源有限的情况下(比如没有足够的人手)要实现MQ中的过滤功能的话,Tag方式是一个更好的选择,SQL则作为不断完善的一个补充(没有也可以接受,有就最好了)。

    “万恶”的业务方

    “消息能不能支持多个Tag,这样发送的时候一条消息从不同的维度打上Tag来实现灵活的过滤需求”——from业务方

    比如一条订单消息可以按照支付方式打标,也可以按照商品品类打表,这样订阅时可以灵活的过滤出目标数据。

    message0.setTags(TagA);
    message1.setTags(TagB);
    message2.setTags(TagA,TagB,TagC);
    
    consumer0.subscribe(topic, TagA&&TagB);==>message2
    consumer1.subscribe(topic, TagA||TagC);==>message0, message2
    

    一是用户提出了这样的需求,二是在支持Tag之后我们也会去考虑Tag的方式还有没有优化空间。

    能不能支持一条消息有多个Tag?

    消息多Tag的问题其实和索引中无法存储Tag原始值的问题是一致的,都是导致索引结构的变化:索引存Tag值或者存多个Tag的HashCode都会导致索引元素的长度不固定,进而无法快速定位消息。

    此时最容易想到的方案就是扩展索引。

    扩展实现多Tag

    扩展索引的方式能保持消息索引依旧是定长的,把Tag相关的数据单独存储,只在有必要的时候读取Tag信息(用户有过滤需求时),如下图所示:

    image.png
    • 索引定长的,那么读消息时依旧可以快速定位到消息

    • 只要过滤的情况下读取Tag信息,对于读流程多了一次读Tag的操作

    • 对于写流程,除了原本的写存储文件和写索引文件外,另外需要写一份Tag文件

    这种方式实现多Tag需求是最直观的,缺陷也是最明显的:读写操作都多了一次Tag的操作。

    更进一步,有没有办法在多Tag的情况下避免掉这一次Tag的读写操作呢?

    不定长索引实现多Tag

    既然不能独立出Tag的存储文件,那么只能直接扩展原来的索引文件了,直接将多个Tag的HashCode存到索引中。

    面临的问题也非常清晰:不定长索引如何解决读取消息时索引的定位问题?

    image

    因为每个索引元素的长度是不确定的,当用户需要读Msg2时,就无法通过2*element size来计算索引位置。只能遍历索引了吗?但是遍历显然又是无法接受的!

    思考一下写消息的过程,我们是怎么确定消息在文件中的写入位置的呢?——追加到末尾。追加的过程其实是记录上一条消息写入后的位置,那么当前的消息就从之前的位置继续写。

    其实消息读取的过程也是一样的,虽然索引是不定长的,但是只要知道了上一条索引的位置和大小,就能定位到下一条消息索引的位置了。

    • 读第0条消息时,直接从索引的开头文件读取即可

    • 读第1条消息时,只需要知道第0条消息的大小即可:0 + msg0 size

    • 读第2条消息时,只需要知道第1条消息末尾的位置即可:msg1 offset + msg1 size

    • 读第N条消息时,只需知道第N-1条消息的索引的位置和大小即可

    那么,在读取第N条消息时其实只要知道第N-1条消息的索引位置就能快速定位出第N条消息的索引。

    而在消息的场景中,99.999%的情况下读完第N条消息时,下一次都会读取第N+1条。

    只有在少数异常的情况下需要修改offset的信息来读取之前或者之后的消息(而这种异常的场景下,可以通过一些优化的手段减少扫描的索引未见的数量来查找速度)。

    不定长索引的寻址过程如下:

    image

    此方案虽然解决了上一种扩展索引的方案带来的问题,但是并不能做到和RocketMQ通过SQL的方式支持灵活的过滤需求。

    总结

    本文从消息过滤的问题出发,介绍了RocketMQ的过滤功能实现,分析了消息过滤的需求,然后总结了不同的多Tag功能的实现方案。

    对于消息过滤的实现,没有哪一种方案是完美的,我们应当从自身的场景出发,考虑现实面对的成本等问题,综合考虑,选择一种最适用于自身业务场景的方案。

    欢迎关注我的公众号MessageQueue交流MQ相关内容。

    image

    相关文章

      网友评论

        本文标题:消息过滤

        本文链接:https://www.haomeiwen.com/subject/zpmqjftx.html