猜想
AcitveMQ依赖Message中JMSXGroupID属性来做消息分组,那分组信息会怎么维护呢?刚开始猜想是维护在类似Map的数据结构中?但是如果消息分组很细,例如按照订单号来分组,势必MQ需要维护大量的分组信息,那这样不是会导致OOM?
消息分组接口
找了一下,最后发现ActiveMQ使用MessageGroupMap接口定义消息分组的相关逻辑
* Represents a map of JMSXGroupID values to consumer IDs
*/
public interface MessageGroupMap {
void put(String groupId, ConsumerId consumerId);
ConsumerId get(String groupId);
ConsumerId removeGroup(String groupId);
MessageGroupSet removeConsumer(ConsumerId consumerId);
void removeAll();
/**
* @return a map of group names and associated consumer Id
*/
Map<String,String> getGroups();
String getType();
void setDestination(Destination destination);
}
MQ 自身有三个MessageGroupMap接口的实现类,包括CachedMessageGroupMap,MessageGroupHashBucket,SimpleMessageGroupMap三个。

然后通过监控MQ的堆内存,发现实际上是使用了CachedMessageGroupMap

看看CachedMessageGroupMap的实现
/**
* A simple implementation which tracks every individual GroupID value in a LRUCache
*/
public class CachedMessageGroupMap implements MessageGroupMap {
private final LRUMap<String, ConsumerId> cache;
private final int maximumCacheSize;
Destination destination;
CachedMessageGroupMap(int size){
cache = new LRUMap<String, ConsumerId>(size) {
@Override
public boolean removeEldestEntry(final Map.Entry eldest) {
boolean remove = super.removeEldestEntry(eldest);
if (remove) {
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
break;
}
}
}
}
return remove;
}
};
maximumCacheSize = size;
}
public synchronized void put(String groupId, ConsumerId consumerId) {
cache.put(groupId, consumerId);
}
public synchronized ConsumerId get(String groupId) {
return cache.get(groupId);
}
public synchronized ConsumerId removeGroup(String groupId) {
return cache.remove(groupId);
}
public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
map.putAll(cache);
for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
String group = iter.next();
ConsumerId owner = map.get(group);
if (owner.equals(consumerId)) {
ownedGroups.add(group);
}
}
for (String group:ownedGroups.getUnderlyingSet()){
cache.remove(group);
}
return ownedGroups;
}
@Override
public synchronized void removeAll(){
cache.clear();
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
}
}
}
@Override
public synchronized Map<String, String> getGroups() {
Map<String,String> result = new HashMap<String,String>();
for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
result.put(entry.getKey(),entry.getValue().toString());
}
return result;
}
@Override
public String getType() {
return "cached";
}
public int getMaximumCacheSize(){
return maximumCacheSize;
}
public String toString() {
return "message groups: " + cache.size();
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
主要关注点还是在CachedMessageGroupMap中存储的结构——LRUMap,原来是利用LinkedHashMap实现了一个LRUMap,如果分组信息过多,会剔除最老的分组信息。
public class LRUMap<K,V> extends LinkedHashMap<K,V>{
protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
private static final long serialVersionUID = -9179676638408888162L;
private int maximumSize;
public LRUMap(int maximumSize) {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
}
public LRUMap(int maximumSize, boolean accessOrder) {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
}
public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
super(initialCapacity, loadFactor, accessOrder);
this.maximumSize = maximumSize;
}
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > maximumSize;
}
}
end~~
网友评论