美文网首页
ActiveMQ的消息分组机制会造成OOM吗?

ActiveMQ的消息分组机制会造成OOM吗?

作者: 超人也害羞 | 来源:发表于2020-08-03 19:32 被阅读0次

猜想

AcitveMQ依赖Message中JMSXGroupID属性来做消息分组,那分组信息会怎么维护呢?刚开始猜想是维护在类似Map的数据结构中?但是如果消息分组很细,例如按照订单号来分组,势必MQ需要维护大量的分组信息,那这样不是会导致OOM?

消息分组接口

找了一下,最后发现ActiveMQ使用MessageGroupMap接口定义消息分组的相关逻辑

 * Represents a map of JMSXGroupID values to consumer IDs
 */
public interface MessageGroupMap {
    void put(String groupId, ConsumerId consumerId);
    ConsumerId get(String groupId);
    ConsumerId removeGroup(String groupId);
    MessageGroupSet removeConsumer(ConsumerId consumerId);
    void removeAll();
    /**
     * @return  a map of group names and associated consumer Id
     */
    Map<String,String> getGroups();
    String getType();
    void setDestination(Destination destination);
}

MQ 自身有三个MessageGroupMap接口的实现类,包括CachedMessageGroupMap,MessageGroupHashBucket,SimpleMessageGroupMap三个。


MessageGroupMap实现类

然后通过监控MQ的堆内存,发现实际上是使用了CachedMessageGroupMap


ActiveMQ-内存监控

看看CachedMessageGroupMap的实现

/**
 * A simple implementation which tracks every individual GroupID value in a LRUCache
 */
public class CachedMessageGroupMap implements MessageGroupMap {
    private final LRUMap<String, ConsumerId> cache;
    private final int maximumCacheSize;
    Destination destination;

    CachedMessageGroupMap(int size){
      cache = new LRUMap<String, ConsumerId>(size) {
          @Override
          public boolean removeEldestEntry(final Map.Entry eldest) {
              boolean remove = super.removeEldestEntry(eldest);
              if (remove) {
                  if (destination != null) {
                      for (Subscription s : destination.getConsumers()) {
                        if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
                            s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
                            break;
                          }
                      }
                  }
              }
              return remove;
          }
      };
      maximumCacheSize = size;
    }
    public synchronized void put(String groupId, ConsumerId consumerId) {
        cache.put(groupId, consumerId);
    }

    public synchronized ConsumerId get(String groupId) {
        return cache.get(groupId);
    }

    public synchronized ConsumerId removeGroup(String groupId) {
        return cache.remove(groupId);
    }

    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
        SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
        Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
        map.putAll(cache);
        for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
            String group = iter.next();
            ConsumerId owner = map.get(group);
            if (owner.equals(consumerId)) {
                ownedGroups.add(group);
            }
        }
        for (String group:ownedGroups.getUnderlyingSet()){
            cache.remove(group);
        }
        return ownedGroups;
    }


    @Override
    public synchronized void removeAll(){
        cache.clear();
        if (destination != null) {
            for (Subscription s : destination.getConsumers()) {
                s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
            }
        }
    }

    @Override
    public synchronized Map<String, String> getGroups() {
        Map<String,String> result = new HashMap<String,String>();
        for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
            result.put(entry.getKey(),entry.getValue().toString());
        }
        return result;
    }

    @Override
    public String getType() {
        return "cached";
    }

    public int getMaximumCacheSize(){
        return maximumCacheSize;
    }

    public String toString() {
        return "message groups: " + cache.size();
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }
}

主要关注点还是在CachedMessageGroupMap中存储的结构——LRUMap,原来是利用LinkedHashMap实现了一个LRUMap,如果分组信息过多,会剔除最老的分组信息。

public class LRUMap<K,V> extends LinkedHashMap<K,V>{

    protected static final float DEFAULT_LOAD_FACTOR = (float) 0.75;
    protected static final int DEFAULT_INITIAL_CAPACITY = 5000;
    private static final long serialVersionUID = -9179676638408888162L;

    private int maximumSize;

    public LRUMap(int maximumSize) {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, true, maximumSize);
    }

    public LRUMap(int maximumSize, boolean accessOrder) {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, accessOrder, maximumSize);
    }

    public LRUMap(int initialCapacity, float loadFactor, boolean accessOrder, int maximumSize) {
        super(initialCapacity, loadFactor, accessOrder);
        this.maximumSize = maximumSize;
    }

    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return size() > maximumSize;
    }
}

end~~

相关文章

网友评论

      本文标题:ActiveMQ的消息分组机制会造成OOM吗?

      本文链接:https://www.haomeiwen.com/subject/odmkrktx.html