批量消息

作者: 念䋛 | 来源:发表于2021-06-19 09:09 被阅读0次

    批量消息
    批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞 吐量。
    相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次 发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB
    实际使用时,这个1MB的限制可以稍微扩大点,实际大的限制是4194304字节,大概4MB。但 是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限 制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
    简单的发送

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
    
        producer.send(messages);
        producer.shutdown();
    }
    
    

    如果不能确定发送的消息是否大于1M,可以把list分为若干个小于1兆的list

    public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
            producer.start();
    
            //large batch
            String topic = "BatchTest";
            List<Message> messages = new ArrayList<>(100 * 1000);
            for (int i = 0; i < 100 * 1000; i++) {
                messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
            }
            //把大的list 分为小于1000*1000的若干个list
            ListSplitter splitter = new ListSplitter(messages);
            while (splitter.hasNext()) {
                List<Message> listItem = splitter.next();
                producer.send(listItem);
            }
            producer.shutdown();
        }
    
    }
    
    class ListSplitter implements Iterator<List<Message>> {
        private int sizeLimit = 1000 * 1000;
        private final List<Message> messages;
        private int currIndex;
    
        public ListSplitter(List<Message> messages) {
            this.messages = messages;
        }
    
        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }
    
        @Override
        public List<Message> next() {
            int nextIndex = currIndex;
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);
                //计算每个消息的大小,topic名称+消息体+属性
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                //日志的开销
                tmpSize = tmpSize + 20;
                //如果大于定义的大小
                if (tmpSize > sizeLimit) {
                    //单个消息超过了最大的限制
                    //忽略,否则会阻塞分裂的进程
                    if (nextIndex - currIndex == 0) {
                        //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > sizeLimit) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
    
        @Override
        public void remove() {
            throw new UnsupportedOperationException("Not allowed to remove");
        }
    
    

    相关文章

      网友评论

        本文标题:批量消息

        本文链接:https://www.haomeiwen.com/subject/wfthyltx.html