过滤消息

作者: 念䋛 | 来源:发表于2021-06-19 09:09 被阅读0次

    过滤消息
    消息的过滤分为两种
    Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔,tag模式的粒度大.
    SQL92过滤方式:利用sql的方式更精确的过滤消息.
    RocketMQ只定义了一些基本语法来支持这个特性。
    SQL语法
    数值比较,比如:>,>=,<,<=,BETWEEN,=; 字符比较,比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号 AND,OR,NOT;
    常量支持类型为:
    数值,比如:123,3.1415;
    字符,比如:'abc',必须用单引号包裹起来; NULL,特殊的常量 布尔值,TRUE 或 FALSE 使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
    过滤是broker服务端的行为,不需要消费端额外的过滤,这样减少了消费者服务器的负担
    TAG过滤

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        //获取TagA  TagC两个tag的消息
        consumer.subscribe("TagFilterTest", "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    
    

    SQL模式
    生产者

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
    
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //sql用a is not null and a between 0 and 3
            //sql模式针对消息中的property属性,在消费端可以用a属性做过滤,也可以属性联合TAG做过滤
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
    
        producer.shutdown();
    }
    
    

    消费者

    public static void main(String[] args) throws Exception {
    
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        //tag在TagA和TagB中,属性a不为空并且在0和3之间,包含0和3 左闭 右闭
        consumer.subscribe("SqlFilterTest",
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    
    

    相关文章

      网友评论

        本文标题:过滤消息

        本文链接:https://www.haomeiwen.com/subject/zaeryltx.html