美文网首页
AllocateMessageQueueAveragely的mq

AllocateMessageQueueAveragely的mq

作者: hei禹 | 来源:发表于2020-07-18 22:40 被阅读0次

    测试数据:
    topic为:topic_a
    consume group为:group
    broker为:broker_a
    consumer的cid分别为:cid_0,cid_1,...
    queue id分别为:0,1,...

    代码测试的是,3个mq,在2/3/4个consumer情况下的mq分配情况

    public class StrategyTest {
        public static void allocate(AllocateMessageQueueStrategy strategy, int mqSize, int cidSize) {
            List<String> cidList = new ArrayList<String>();
            for (int i = 0; i < cidSize; i++) {
                cidList.add("cid_" + i);
            }
            List<MessageQueue> mqList = new ArrayList<MessageQueue>(mqSize);
            for (int i = 0; i < mqSize; i++) {
                mqList.add(makeMq(i));
            }
            for (int i = 0; i < cidSize; i++) {
                doTest(strategy, i, cidList, mqList);
            }
            System.out.println("-----");
        }
        private static void doTest(AllocateMessageQueueStrategy strategy, int currentCid, List<String> cidList, List<MessageQueue> mqList) {
            List<MessageQueue> result = strategy.allocate("group","cid_" + currentCid, mqList, cidList);
            List<Integer> queueIdList = new ArrayList<>(result.size());
            for (MessageQueue mq : result) {
                queueIdList.add(mq.getQueueId());
            }
            System.out.println("cid: " + currentCid + " queueId: " + queueIdList);
        }
        private static MessageQueue makeMq(int queueId) {
            MessageQueue q = new MessageQueue();
            q.setTopic("topic_a");
            q.setBrokerName("broker_a");
            q.setQueueId(queueId);
            return q;
        }
        public static void main(String[] args) {
            test(new AllocateMessageQueueAveragely());
            test(new AllocateMessageQueueAveragelyByCircle());
        }
        private static void test(AllocateMessageQueueStrategy strategy) {
            System.out.println(strategy.getClass().getSimpleName());
            System.out.println(" ");
            StrategyTest.allocate(strategy, 3, 2);
            StrategyTest.allocate(strategy, 3, 3);
            StrategyTest.allocate(strategy, 3, 4);
            System.out.println("|||||");
        }
    }
    

    结果如下面所示:

    cid: 0 queueId: [0, 1]
    cid: 1 queueId: [2]
    -----
    cid: 0 queueId: [0]
    cid: 1 queueId: [1]
    cid: 2 queueId: [2]
    -----
    cid: 0 queueId: [0]
    cid: 1 queueId: [1]
    cid: 2 queueId: [2]
    cid: 3 queueId: []
    
    分配示意图.png

    mqadmin工具提供了一个allocateMQ子命令,通过其我们可以预览某个Topic在多个消费者分区是如何分配的,使用方式如下:

    sh bin/mqadmin allocateMQ -i ip1,ip2,ip3 -t TopicA -n localhost:9876
    

    这个工具可以将模拟分配的结果进行json格式展示。

    相关文章

      网友评论

          本文标题:AllocateMessageQueueAveragely的mq

          本文链接:https://www.haomeiwen.com/subject/cbkqkktx.html