系列
开篇
-
这个系列主要用来讲解RocketMq的一些常用功能的实现原理,包括 消息过滤、广播模式消费、事务消息、同步调用等功能。
-
这篇文章主要是用来讲解RocketMq的效率过滤的实现逻辑。
消息过滤的结论
-
Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
-
Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
消息存储生成tagsCode
public class CommitLog {
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
try {
if (propertiesLength > 0) {
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
}
return new DispatchRequest(
topic,queueId,physicOffset,totalSize,tagsCode,storeTimestamp,
queueOffset,keys,uniqKey,sysFlag,preparedTransactionOffset,
propertiesMap
);
} catch (Exception e) {
}
return new DispatchRequest(-1, false /* success */);
}
}
public class MessageExtBrokerInner extends MessageExt {
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }
return tags.hashCode();
}
}
- DispatchRequest过程中通过tagsString2tagsCode将tagsString转为tagsCode。
- DispatchRequest用于consumeQueue的存储,consumeQueue保存了tagsCode。
broker处理消息消费
public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// 遍历ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
// 省略代码
} else {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
// 省略嗲吗
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
// 省略代码
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
}
} finally {
bufferConsumeQueue.release();
}
} else {
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
return getResult;
}
}
- broker在处理consumer消费消息过程中首先查找ConsumeQueue获取对象。
- 针对consumeQueue当中的对象tagsCode和consumer的订阅信息进行比较,subscriptionData.getCodeSet().contains(tagsCode.intValue())。
broker的tags的匹配逻辑
public class ExpressionMessageFilter implements MessageFilter {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
protected final SubscriptionData subscriptionData;
protected final ConsumerFilterData consumerFilterData;
protected final ConsumerFilterManager consumerFilterManager;
protected final boolean bloomDataValid;
public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
this.subscriptionData = subscriptionData;
this.consumerFilterData = consumerFilterData;
this.consumerFilterManager = consumerFilterManager;
if (consumerFilterData == null) {
bloomDataValid = false;
return;
}
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
bloomDataValid = true;
} else {
bloomDataValid = false;
}
}
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
// no expression or no bloom
if (consumerFilterData == null || consumerFilterData.getExpression() == null
|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
return true;
}
// message is before consumer
if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
return true;
}
byte[] filterBitMap = cqExtUnit.getFilterBitMap();
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (filterBitMap == null || !this.bloomDataValid
|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
return true;
}
BitsArray bitsArray = null;
try {
bitsArray = BitsArray.create(filterBitMap);
boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
return ret;
} catch (Throwable e) {
log.error("bloom filter error, sub=" + subscriptionData
+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
}
}
return true;
}
}
- ExpressionType.isTagType(subscriptionData.getExpressionType())针对Tag类型比较。
- subscriptionData.getCodeSet().contains(tagsCode.intValue())执行的比较匹配逻辑。
consumer的tags匹配逻辑
public class PullAPIWrapper {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String consumerGroup;
private final boolean unitMode;
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
private volatile boolean connectBrokerByUser = false;
private volatile long defaultBrokerId = MixAll.MASTER_ID;
private Random random = new Random(System.currentTimeMillis());
private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
this.mQClientFactory = mQClientFactory;
this.consumerGroup = consumerGroup;
this.unitMode = unitMode;
}
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
// 针对msg的tags信息和subscriptionData的tags信息进行比较
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
}
- consumer侧在PullAPIWrapper#processPullResult处理消息过滤处理。
- 针对subscriptionData.getTagsSet().contains(msg.getTags())来过滤符合订阅tags信息的消息。
网友评论