一、背景
类似于消息的灰度标识,消息本来是所有节点都会消费的,但是打上了灰度标识的消息,只有灰度节点才能消费。这样,就使用tag机制将消息隔离开来。
借鉴这种思路,我们在websocket集群开发过程中,ws消息发送给建立通道的节点,故对消息增加tag标签,仅让建立通道的节点才消费。
二、部署示意图
image.png老师和学生A连接通道节点1,学生B连接的通道是节点2。
现在,老师需要发送ws消息给A和B,经查找,A和老师在同一个节点,找到A的通道,直接发送ws消息即可。但是,B在其他节点,则需要通过mq消息转发。
mq消息想要只让节点2消费,而不能被节点1等其他节点消费掉。
-
rocketmq队列是test-topic,见下图:
image.png
三、问题描述
下面的场景,都是两个节点订阅的tag不同的前提。
1、消费组相同的情况下,使用集群模式。
老师给学生A和B发送10条消息,A全部能收到;B收到的消息数量不定,总是会丢失消息。(有时4条,有时5条,有时6条)
- 备注:这种结果,出乎我的预计。老师发给学生B的消费,明明打上了tag,怎么还会被其他人消费了呢。第二个疑问是,如果节点1消费了,onMessage()方法怎么不会打印消费的日志。第三个疑问是,我查看rocketmq控制台,发现10条消息都被消费了。
- 总结:我于是把节点1的消费代码先注释掉,再测试发现,这之后就正常了。所以,节点1消费mq消息的环节是在onMessage()之前。
2、消费组不同的情况下,使用集群模式。
老师给学生A和B发送10条消息,A和B都全部能收到。
-
这不让服务变成有状态了吗? 接着本文开头所讲,灰度消息只被灰度节点消费,在给消息打上灰度标签的时候,还必须更改灰度节点的消费组名称吗?如果在rabbitmq中要实现Mq消息的灰度,维度是队列,而rocketmq这里的队列不会增加,通过更细的维度--tag标签来区分灰度消息。
-
在rocketmq控制台,可以看到它的组也变成了两个。
image.png -
同一个topic,两个节点对应不同的订阅组,所以同一个mq消息会被各自消费。但是,他们的消费状态不同 ,注意看下图。
image.png -
同一个topic,不同的tag消息,为所有的消费者消费。(但如果消费组只有一个,则可能“丢消息”。)
3、消费组相同的情况下,使用广播模式。
messageModel默认是集群模式。
老师给学生A和B发送10条消息,A和B都全部能收到。
image.pngimport com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void send(OutMessage message, String tag) {
log.info("发送mq消息:{}, tag={}", JSON.toJSONString(message), tag);
rocketMQTemplate.convertAndSend("test-topic" + ":" + tag, message);
}
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@Component
@Slf4j
@RocketMQMessageListener(topic = "test-topic",
nameServer = "${rocketmq.nameServer}",
consumerGroup = "${rocketmq.consumer.group}",
messageModel = MessageModel.BROADCASTING)
public class MQReceiver implements RocketMQListener<OutMessage> {
@Override
public void onMessage(OutMessage outMessage) {
log.info("收到mq消息:{}", JSON.toJSONString(outMessage));
}
}
修改注解@RocketMQMessageListener中的属性selectorExpression:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Map;
/**
* 因为RocketMQMessageListener不提供动态配置功能
* springboot初始化后rocket容器初始化前利用反射动态改变
* RocketMQMessageListener注解selectorExpression的值
*/
@Component
public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
RocketMQMessageListener annoTable =
MQReceiver.class.getAnnotation(RocketMQMessageListener.class);
// 获取代理处理器
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable);
// 获取私有 memberValues 属性
Field f = invocationHandler.getClass().getDeclaredField("memberValues");
f.setAccessible(true);
// 获取实例的属性map
Map<String, Object> memberValues = (Map<String, Object>)
f.get(invocationHandler);
// 修改属性值
memberValues.put("selectorExpression", "abc");
}
}
四、问一问chatGPT
rocketmq之消费消息.png五、总结
回到我们的方案选择上,topic和消费组都相同,只tag不一样,要实现预期 的效果,则消息模式要指定为广播模式。
而我们原本的程序设计,也是采用rabbitmq发送广播,让所有的节点去消费一遍。这样,无法减少每个节点处理mq消息的数量,不利于通道节点的水平扩容。
如果同一topic下由多个消费组去消费,也就是说N个节点,就N个消费组。随着节点的水平扩容,消费组的数量也随着增加。消费组的名称可以取机器的Mac地址,保证其唯一性。
对于这个方法,至少损失了服务的无状态性,不是很好理解,感觉是为了做而做,比较鸡肋。
消费组的初衷是让不同的服务在订阅同一个topic而设计。
网友评论