1、简单的生产消息
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
2、将消息拆分发送
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //large batch String topic = "BatchTest2";
List messages = new ArrayList<>(100 * 10000);
for (int i = 0; i < 100 * 10000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List listItem = splitter.next();
producer.send(listItem);
Thread.sleep(4000);
}
}
}
大家可以定制自己的发送策略
class ListSplitter implements Iterator> {
private int sizeLimit = 1000 * 1000; private final List messages; private int currIndex; public ListSplitter(List messages) {
this.messages = messages;
}
@Override public boolean hasNext() { return currIndex < messages.size(); }
@Override public List next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Mapproperties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > sizeLimit) {
//it is unexpected that single message exceeds the sizeLimit
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
} List subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
网友评论