美文网首页
Rocketmq的消息tag适用场景分析

Rocketmq的消息tag适用场景分析

作者: 天草二十六_简村人 | 来源:发表于2023-02-19 14:37 被阅读0次

一、背景

类似于消息的灰度标识,消息本来是所有节点都会消费的,但是打上了灰度标识的消息,只有灰度节点才能消费。这样,就使用tag机制将消息隔离开来。

借鉴这种思路,我们在websocket集群开发过程中,ws消息发送给建立通道的节点,故对消息增加tag标签,仅让建立通道的节点才消费。

二、部署示意图

image.png

老师和学生A连接通道节点1,学生B连接的通道是节点2。
现在,老师需要发送ws消息给A和B,经查找,A和老师在同一个节点,找到A的通道,直接发送ws消息即可。但是,B在其他节点,则需要通过mq消息转发。

mq消息想要只让节点2消费,而不能被节点1等其他节点消费掉。

  • rocketmq队列是test-topic,见下图:


    image.png

三、问题描述

下面的场景,都是两个节点订阅的tag不同的前提。

1、消费组相同的情况下,使用集群模式。

老师给学生A和B发送10条消息,A全部能收到;B收到的消息数量不定,总是会丢失消息。(有时4条,有时5条,有时6条)

  • 备注:这种结果,出乎我的预计。老师发给学生B的消费,明明打上了tag,怎么还会被其他人消费了呢。第二个疑问是,如果节点1消费了,onMessage()方法怎么不会打印消费的日志。第三个疑问是,我查看rocketmq控制台,发现10条消息都被消费了。
image.png
  • 总结:我于是把节点1的消费代码先注释掉,再测试发现,这之后就正常了。所以,节点1消费mq消息的环节是在onMessage()之前。

2、消费组不同的情况下,使用集群模式。

老师给学生A和B发送10条消息,A和B都全部能收到。

  • 这不让服务变成有状态了吗? 接着本文开头所讲,灰度消息只被灰度节点消费,在给消息打上灰度标签的时候,还必须更改灰度节点的消费组名称吗?如果在rabbitmq中要实现Mq消息的灰度,维度是队列,而rocketmq这里的队列不会增加,通过更细的维度--tag标签来区分灰度消息。

  • 在rocketmq控制台,可以看到它的组也变成了两个。


    image.png
  • 同一个topic,两个节点对应不同的订阅组,所以同一个mq消息会被各自消费。但是,他们的消费状态不同 ,注意看下图。


    image.png
  • 同一个topic,不同的tag消息,为所有的消费者消费。(但如果消费组只有一个,则可能“丢消息”。)

3、消费组相同的情况下,使用广播模式。

messageModel默认是集群模式。

老师给学生A和B发送10条消息,A和B都全部能收到。

image.png
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void send(OutMessage message, String tag) {
        log.info("发送mq消息:{}, tag={}", JSON.toJSONString(message), tag);
        rocketMQTemplate.convertAndSend("test-topic" + ":" + tag, message);
    }
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test-topic",
        nameServer = "${rocketmq.nameServer}",
        consumerGroup = "${rocketmq.consumer.group}",
        messageModel = MessageModel.BROADCASTING)
public class MQReceiver implements RocketMQListener<OutMessage> {
    @Override
    public void onMessage(OutMessage outMessage) {
       log.info("收到mq消息:{}", JSON.toJSONString(outMessage));
    }
}

修改注解@RocketMQMessageListener中的属性selectorExpression:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Map;

/**
 * 因为RocketMQMessageListener不提供动态配置功能
 * springboot初始化后rocket容器初始化前利用反射动态改变
 * RocketMQMessageListener注解selectorExpression的值
 */
@Component
public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean {

    @Override
    public void afterPropertiesSet() throws Exception {
        RocketMQMessageListener annoTable =
                MQReceiver.class.getAnnotation(RocketMQMessageListener.class);
        // 获取代理处理器
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable);
        // 获取私有 memberValues 属性
        Field f = invocationHandler.getClass().getDeclaredField("memberValues");
        f.setAccessible(true);
        // 获取实例的属性map
        Map<String, Object> memberValues = (Map<String, Object>)
                f.get(invocationHandler);
        // 修改属性值
        memberValues.put("selectorExpression", "abc");
    }

}

四、问一问chatGPT

rocketmq之消费消息.png

五、总结

回到我们的方案选择上,topic和消费组都相同,只tag不一样,要实现预期 的效果,则消息模式要指定为广播模式。

而我们原本的程序设计,也是采用rabbitmq发送广播,让所有的节点去消费一遍。这样,无法减少每个节点处理mq消息的数量,不利于通道节点的水平扩容。

如果同一topic下由多个消费组去消费,也就是说N个节点,就N个消费组。随着节点的水平扩容,消费组的数量也随着增加。消费组的名称可以取机器的Mac地址,保证其唯一性。

对于这个方法,至少损失了服务的无状态性,不是很好理解,感觉是为了做而做,比较鸡肋。

消费组的初衷是让不同的服务在订阅同一个topic而设计。

参考链接

相关文章

  • Rocketmq的消息tag适用场景分析

    一、背景 类似于消息的灰度标识,消息本来是所有节点都会消费的,但是打上了灰度标识的消息,只有灰度节点才能消费。这样...

  • RocketMQ(十)高级特性-消息过滤

    RocketMq提供消息过滤的功能,用于同一topic下,区分不同业务场景的消息。 Tag,即消息标签,用于对某个...

  • RocketMq 堆积查询

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMq 事务消息

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMq 消费位点上报

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMq 消息查询

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 Rocke...

  • RocketMQ 同步调用的新特性

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 开篇 这个...

  • RocketMq 广播模式

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 开篇 这个...

  • RocketMq 消息Tag过滤

    系列 RocketMq 消息Tag过滤 RocketMq 广播模式 RocketMQ 同步调用的新特性 开篇 这个...

  • RocketMQ SQL92过滤简析

    RocketMQ 4.x的版本对于消息过滤提供了2种常用的方式: tag SQL92 在大部分的场景下,tag过滤...

网友评论

      本文标题:Rocketmq的消息tag适用场景分析

      本文链接:https://www.haomeiwen.com/subject/eimikdtx.html