RocketMQ顺序消息

作者: mingxungu | 来源:发表于2020-04-11 16:36 被阅读0次

    我们知道消息队列的特性导致其消息不是顺序进行消费的,RocketMQ没有提供所谓的顺序消息来供我们使用,但是有时候一些场景需要需要顺序的去接收消息。今天我们重点讨论一下如何实现这种功能。虽然RocketMQ没有提供顺序消费但是我们可以变相的来实现它。我们知道消息需要放入队列中才能被消费,而队列本身的特性就是FIFO先进先出,我们可以将需要顺序的消息放入一个队列中,则就可以实现这个功能。

    1、场景分析

    场景:两个业务系统之间消息通过MQ传输,业务系统A数据传输至业务系统B,要求消息准确、实时。但是业务系统A的原始的数据可能会存在修改的情况,要求业务系统B需要实时的更改。保证消息的实时性、一致性、可靠性。

    1.1、默认消息生产过程分析

    image

    主题test_1发送消息到RocketMQ的双主Broker1、Broker2上,每个broker上test_1主题对应4个队列,消息id为001001的消息存在创建(create)、更新(update),MQ集群是双主的,使用默认的消息发送算法,消息将轮询的丢弃到各个队列中。

    默认按照轮询算法将消息分发到各个broker的不同的队列中,保证每个队列的消息都是均匀分配,集群消费且消费者多个时,多个消费者会分散到不同的队列中消费消息,保证消息能够实时消费。

    因为消息本身是放入到不同的队列中消费的就不能保证其顺序性,更新的消息可能是最先被消费掉,创建的消息消费时业务需要判断消息是否是最新的,需要进行查库验证,是则更新,不是则丢弃保存最新的消息,保证业务系统A与业务系统B,数据的一致性,增加了业务处理的难度。

    1.2、顺序消息生产过程分析

    image

    我们在生产消息的时候可以将同一个消息ID的消息放入到相同的队列中,保证同一类需要顺序消费的消息放入到同一个队列中,这样队列中的消息就是有序的。但是同时也需要保证消息的消费也是有序的才可以保证消息的顺序消费。

    集群模式下同一个消费组内的消费者共同承担其订阅主题下的消息队列的消费,同一个消息消息队列在同一时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列。

    集群模式下的普通消息,线程池默认创建20个(可配置)线程。多线程从队列中拉取消息,提高并发加快消息的消费。

    集群模式下的顺序消息,顺序消费是单线程,一个线程只能去一个队列获取数据,当需要获取某个队列中的消息时,需要锁定该消息队列(PS:后面会根据源码详细分析其原理)。

    广播模式下的顺序消息,顺序消费是单线程,直接进行消费,无需锁定消息队列,因为相互之间无竞争

    2、编写Producer

    2.1、自定义队列选择器

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("order_group_test_1");
        //Launch the instance.
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("order_test_1", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
    
            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
    
    

    我们根据100个消息的序号来放入到不同的队列中,根据序号%10取模,相同的放入到一个队列中。

    2.2、提供的队列选择器

    上面是我们实现自定义队列选择器的算法,RocketMQ也提供了三种队列选择算法

    image

    从图中我们可以看到一共三种

    • SelectMessageQueueByHash:通过 hash 进行选择 queue。
    • SelectMessageQueueByRandom:随机选择 queue。
    • SelectMessageQueueByMachineRoom:机房选择queue(未实现)

    我们分别来看一下实现

    2.2.1、SelectMessageQueueByHash

    public class SelectMessageQueueByHash implements MessageQueueSelector {
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = arg.hashCode();
            if (value < 0) {
                value = Math.abs(value);
            }
            value %= mqs.size();
            return ((MessageQueue) mqs.get(value));
        }
    }
    
    

    我们差看源码可以发现,通过提供的参数获取其HashCode,如果为负值则取绝对值,hash值与队列的总数进行取模获取其队列。

    2.2.2、SelectMessageQueueByRandom

    public class SelectMessageQueueByRandom implements MessageQueueSelector {
        private Random random;
    
        public SelectMessageQueueByRandom() {
            this.random = new Random(System.currentTimeMillis());
        }
    
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = this.random.nextInt(mqs.size());
            return ((MessageQueue) mqs.get(value));
        }
    }
    
    

    生成一个队列数以内的随机数,通过随机数获取队列。

    2.2.3、SelectMessageQueueByMachineRoom(未实现)

    public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
        private Set<String> consumeridcs;
    
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            return null;
        }
    
        public Set<String> getConsumeridcs() {
            return this.consumeridcs;
        }
    
        public void setConsumeridcs(Set<String> consumeridcs) {
            this.consumeridcs = consumeridcs;
        }
    }
    
    

    我们发现其select方法为null,其实是没有进行实现。需要我们自己实现。

    虽然RocketMQ提供了三种(其实2种,SelectMessageQueueByMachineRoom未实现)队列选择算法,但是不建议使用,不同的业务规则其选择队列的算法也不尽相同,建议手动实现。

    3、编写Consumer

    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("order_consumer_test_push");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("order_test_1", "*");
            consumer.registerMessageListener(new MessageListenerOrderly(){
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeOrderlyContext paramConsumeOrderlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            System.out.println("  MessageBody: "+ msgbody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS; //消费成功
                }
            });
            consumer.start();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    

    查看结果

    image

    我们找到两个典型的一组数字,尾号是6和9的,尾号是6的计较集中,尾号是9的比较分散,但是结果都是一样的,按照顺序消费的。

    相关文章

      网友评论

        本文标题:RocketMQ顺序消息

        本文链接:https://www.haomeiwen.com/subject/rqlumhtx.html