背景
在一个支付场景中,需要先支开通会员,才可以短信收到开通会员对应的权益,假设开通会员的短信是在获取权益之前的,大概的流程图如下
image-20220216210723046一但会员功能开通,为了降低系统的耦合性(高内聚,低耦合),系统可能会立马发出两个消息,一个是通知用户开通(A),第二个是通知权益到账(B),再由下游MSG系统来负责对接对接对应的渠道商。在这过程中,可能下游系统第二个比第一个消费的快,就导致权益到账先于会员开通提醒。那有没有什么好的办法来解决呢?
解决办法
问题查找
产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,切B早于A
利用rocketmq的顺序消息
在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)
实现原理
image-20220216211622252在rmq中的实现
生产者api
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
在我们发送一条消息的时候,我们可以指定一个 MessageQueueSelector (队列选择器),来指明消息需要发送到哪个队列中去
参数解释
参数名称 | 解释 | 备注 |
---|---|---|
msg | 消息内容 | |
selector | 队列路由规则 | |
arg | 规则参数 | 一般是一个不变的key |
示例代码
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId); //orderId 为定值
MessageQueueSelector代码解析
1.只需要实现 MessageQueueSelector 这个接口就好了,系统也提供了一个默认的实现
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
2.默认的实现
image-20220216212621579实现名称 | 说明 | 备注 |
---|---|---|
哈希 | 通过hash算法路由 | |
机房 | 通过机房路由 | |
随机 | 随机路由 |
几种实现
哈希
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode() % mqs.size();
if (value < 0) {
value = Math.abs(value);
}
return mqs.get(value);
}
}
机房
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
随机
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
总结
rmq通过对指定的key进行规则路由,然后选取一个指定队列,把需要发送的消息发送到同一个队列中去,根据队列的FIFO特性,做到顺序消费。
网友评论