美文网首页rocketMq理论与实践
RocketMQ producer 有序消息

RocketMQ producer 有序消息

作者: 晴天哥_王志 | 来源:发表于2020-05-03 06:51 被阅读0次

    系列

    开篇

    • 这个系列的主要目的是介绍RocketMq producer的原理和用法,在这个系列当中会介绍 producer的启动流程、producer的路由同步、producer的消息发送流程,producer的有序消息。

    • 这篇文章介绍producer的有序消息,介绍producer的有序消息实现原理。

    producer举例

    public class Producer {
        public static void main(String[] args) throws UnsupportedEncodingException {
            try {
                MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                producer.start();
    
                String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                for (int i = 0; i < 100; i++) {
                    int orderId = i % 10;
                    Message msg =
                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                }
    
                producer.shutdown();
            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • producer发送有序消息核心在于定义MessageQueueSelector并指定指定hash key(如上图中的orderId)。
    • hash key 主要目的是解决根据唯一标识来解决hash键的计算。
    • MessageQueueSelector 主要目的是解决MessageQueue的选择策略。

    selector的作用时期

    public class DefaultMQProducerImpl implements MQProducerInner {
    
        private SendResult sendSelectImpl(
            Message msg,
            MessageQueueSelector selector,
            Object arg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback, final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            long beginStartTime = System.currentTimeMillis();
            this.makeSureStateOK();
            Validators.checkMessage(msg, this.defaultMQProducer);
    
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                MessageQueue mq = null;
                try {
                    List<MessageQueue> messageQueueList =
                        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
                    Message userMessage = MessageAccessor.cloneMessage(msg);
                    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                    userMessage.setTopic(userTopic);
                    // selector.select负责选择指定的MessageQueue发送消息
                    mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
                } catch (Throwable e) {
                    throw new MQClientException("select message queue throwed exception.", e);
                }
    
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout < costTime) {
                    throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
                }
                if (mq != null) {
                    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
                } else {
                    throw new MQClientException("select message queue return null.", null);
                }
            }
    
            validateNameServerSetting();
            throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
        }
    }
    
    • 在RocketMq发送消息过程中会通过selector.select()来实现MessageQueue的选择。
    • selector的实现分为系统定义和自定义两种。

    selector的分类

    系统定义selector

    public class SelectMessageQueueByHash implements MessageQueueSelector {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = arg.hashCode();
            if (value < 0) {
                value = Math.abs(value);
            }
    
            value = value % mqs.size();
            return mqs.get(value);
        }
    }
    
    
    public class SelectMessageQueueByRandom implements MessageQueueSelector {
        private Random random = new Random(System.currentTimeMillis());
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = random.nextInt(mqs.size());
            return mqs.get(value);
        }
    }
    
    • SelectMessageQueueByHash根据hash key取hash_code,然后进行取模选择指定的MessageQueue。
    • SelectMessageQueueByRandom主要是随机获取MessageQueue。
    public class Producer {
        public static void main(String[] args) throws UnsupportedEncodingException {
            try {
                MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
                producer.start();
    
                String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                for (int i = 0; i < 100; i++) {
                    int orderId = i % 10;
                    Message msg =
                        new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                }
    
                producer.shutdown();
            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 自定义的selector本质上直接用hash key直接取模来选择MessageQueue。

    相关文章

      网友评论

        本文标题:RocketMQ producer 有序消息

        本文链接:https://www.haomeiwen.com/subject/hcvrghtx.html