美文网首页
RocketMQ 7.消息过滤

RocketMQ 7.消息过滤

作者: 香沙小熊 | 来源:发表于2021-01-14 17:11 被阅读0次

    RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。

    20180705103335515.png

    1.Tag过滤方式

    image.png
    consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    

    Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

    1. 在Broker端进行Message Tag比对,先遍历 Consume Queue,如果存储的Message Tag 不订阅的 MessageTag不符合,则跳过,继续比对下一个,符合则传输给Consumer。
      注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。

    2. Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是 Hashcode。

    为什么过滤要这样做?

    1. Message Tag 存储 Hashcode,是为了在Consume Queue定长方式存储,节约空间。
    2. 过滤过程中不会访问 Commit Log 数据,可以保证堆积情况下也能高效过滤。
    3. 即使存在 Hash 冲突,也可以在 Consumer 端进行修正,保证万无一失。

    2. SQL92的过滤方式

    这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

    RocketMQ支持根据用户自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b ='abc'

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group_name");
            // 测试nameserver的地址
            producer.setNamesrvAddr(Const.NAMESRV_ADDR);
    
            // 启动生产者
            producer.start();
            // 生产者生产消息的时候设置tags,在这里可以通过设置不能的tags来获取对应的数据
            // tags 设置*表示 所有换消息, 使用 || 表示获取多个
            // Message message = new Message("my-topic", "add || update", msg.getBytes("UTF-8"));
            for (int i = 0; i < 10; i++) {
                String sex =  i%2==0?"Male":"Female";
                String msg = "sex:"+ sex +"  age:" +i;
    
                Message message = new Message("my-topic-filter", "add", msg.getBytes("UTF-8"));
                message.putUserProperty("sex", sex);
                message.putUserProperty("age", i+"");
    
                SendResult sendResult = producer.send(message);
    //        System.out.println("消息id:" + sendResult.getMsgId());
    //        System.out.println("消息队列:" + sendResult.getMessageQueue());
    //        System.out.println("消息偏移量:" + sendResult.getQueueOffset());
                System.out.println(sendResult);
            }
    
            producer.shutdown();
        }
    
    }
    
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_group_name");
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
            consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='Female' AND age >= 5"));
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg: msgs) {
                            System.out.println("消息:" + new String (msg.getBody(), "UTF-8"));
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
    //                System.out.println("接收到的消息" + msgs);
                    // 返回消息是否消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            
            consumer.start();
        }
    }
    

    先后运行Producer、Consumer

    消息:sex:Female  age:7
    消息:sex:Female  age:5
    消息:sex:Female  age:9
    

    相关文章

      网友评论

          本文标题:RocketMQ 7.消息过滤

          本文链接:https://www.haomeiwen.com/subject/wkbvaktx.html