美文网首页
浅析rocketmq顺序消息

浅析rocketmq顺序消息

作者: 秃秃少年小猪 | 来源:发表于2022-02-16 21:32 被阅读0次

    背景

    在一个支付场景中,需要先支开通会员,才可以短信收到开通会员对应的权益,假设开通会员的短信是在获取权益之前的,大概的流程图如下

    image-20220216210723046

    一但会员功能开通,为了降低系统的耦合性(高内聚,低耦合),系统可能会立马发出两个消息,一个是通知用户开通(A),第二个是通知权益到账(B),再由下游MSG系统来负责对接对接对应的渠道商。在这过程中,可能下游系统第二个比第一个消费的快,就导致权益到账先于会员开通提醒。那有没有什么好的办法来解决呢?

    解决办法

    问题查找

    产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,切B早于A

    利用rocketmq的顺序消息

    在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)

    实现原理

    image-20220216211622252

    在rmq中的实现

    生产者api

     SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
    

    在我们发送一条消息的时候,我们可以指定一个 MessageQueueSelector (队列选择器),来指明消息需要发送到哪个队列中去

    参数解释

    参数名称 解释 备注
    msg 消息内容
    selector 队列路由规则
    arg 规则参数 一般是一个不变的key

    示例代码

      SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
                }, orderId); //orderId 为定值
    

    MessageQueueSelector代码解析

    1.只需要实现 MessageQueueSelector 这个接口就好了,系统也提供了一个默认的实现

    public interface MessageQueueSelector {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    

    2.默认的实现

    image-20220216212621579
    实现名称 说明 备注
    哈希 通过hash算法路由
    机房 通过机房路由
    随机 随机路由

    几种实现

    哈希
    public class SelectMessageQueueByHash implements MessageQueueSelector {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = arg.hashCode() % mqs.size();
            if (value < 0) {
                value = Math.abs(value);
            }
            return mqs.get(value);
        }
    }
    
    机房
    public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
        private Set<String> consumeridcs;
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            return null;
        }
    
        public Set<String> getConsumeridcs() {
            return consumeridcs;
        }
    
        public void setConsumeridcs(Set<String> consumeridcs) {
            this.consumeridcs = consumeridcs;
        }
    }
    
    随机
    public class SelectMessageQueueByRandom implements MessageQueueSelector {
        private Random random = new Random(System.currentTimeMillis());
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = random.nextInt(mqs.size());
            return mqs.get(value);
        }
    }
    

    总结

    rmq通过对指定的key进行规则路由,然后选取一个指定队列,把需要发送的消息发送到同一个队列中去,根据队列的FIFO特性,做到顺序消费。

    相关文章

      网友评论

          本文标题:浅析rocketmq顺序消息

          本文链接:https://www.haomeiwen.com/subject/mmodlrtx.html