RocketMq 消息Tag过滤

作者: 晴天哥_王志 | 来源:发表于2020-05-09 13:01 被阅读0次

    系列

    开篇

    • 这个系列主要用来讲解RocketMq的一些常用功能的实现原理,包括 消息过滤、广播模式消费、事务消息、同步调用等功能。

    • 这篇文章主要是用来讲解RocketMq的效率过滤的实现逻辑。

    消息过滤的结论

    • Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。

    • Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

    消息存储生成tagsCode

    public class CommitLog {
    
        public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
            final boolean readBody) {
    
            try {
                if (propertiesLength > 0) {
                    String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
                    if (tags != null && tags.length() > 0) {
                        tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
                    }
                }
    
                return new DispatchRequest(
                    topic,queueId,physicOffset,totalSize,tagsCode,storeTimestamp,
                    queueOffset,keys,uniqKey,sysFlag,preparedTransactionOffset,
                    propertiesMap
                );
            } catch (Exception e) {
            }
    
            return new DispatchRequest(-1, false /* success */);
        }
    }
    
    
    public class MessageExtBrokerInner extends MessageExt {
    
        public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
            if (null == tags || tags.length() == 0) { return 0; }
    
            return tags.hashCode();
        }
    }
    
    • DispatchRequest过程中通过tagsString2tagsCode将tagsString转为tagsCode。
    • DispatchRequest用于consumeQueue的存储,consumeQueue保存了tagsCode。

    broker处理消息消费

    public class DefaultMessageStore implements MessageStore {
    
        public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
            final int maxMsgNums,
            final MessageFilter messageFilter) {
            
            // 遍历ConsumeQueue
            ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
            if (consumeQueue != null) {
                minOffset = consumeQueue.getMinOffsetInQueue();
                maxOffset = consumeQueue.getMaxOffsetInQueue();
    
                if (maxOffset == 0) {
                  // 省略代码
                } else {
                    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                    if (bufferConsumeQueue != null) {
                        try {
                            // 省略嗲吗
                            for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
    
                                // 省略代码
    
                                if (messageFilter != null
                                    && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
    
                                    continue;
                                }
    
                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                            }
                        } finally {
                            bufferConsumeQueue.release();
                        }
                    } else {
                    }
                }
            } else {
                status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            }
    
            return getResult;
        }
    }
    
    • broker在处理consumer消费消息过程中首先查找ConsumeQueue获取对象。
    • 针对consumeQueue当中的对象tagsCode和consumer的订阅信息进行比较,subscriptionData.getCodeSet().contains(tagsCode.intValue())。

    broker的tags的匹配逻辑

    public class ExpressionMessageFilter implements MessageFilter {
    
        protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
    
        protected final SubscriptionData subscriptionData;
        protected final ConsumerFilterData consumerFilterData;
        protected final ConsumerFilterManager consumerFilterManager;
        protected final boolean bloomDataValid;
    
        public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
            ConsumerFilterManager consumerFilterManager) {
            this.subscriptionData = subscriptionData;
            this.consumerFilterData = consumerFilterData;
            this.consumerFilterManager = consumerFilterManager;
            if (consumerFilterData == null) {
                bloomDataValid = false;
                return;
            }
            BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
            if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
                bloomDataValid = true;
            } else {
                bloomDataValid = false;
            }
        }
    
        @Override
        public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            if (null == subscriptionData) {
                return true;
            }
    
            if (subscriptionData.isClassFilterMode()) {
                return true;
            }
    
            // by tags code.
            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    
                if (tagsCode == null) {
                    return true;
                }
    
                if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                    return true;
                }
    
                return subscriptionData.getCodeSet().contains(tagsCode.intValue());
            } else {
                // no expression or no bloom
                if (consumerFilterData == null || consumerFilterData.getExpression() == null
                    || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
                    return true;
                }
    
                // message is before consumer
                if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
                    log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
                    return true;
                }
    
                byte[] filterBitMap = cqExtUnit.getFilterBitMap();
                BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
                if (filterBitMap == null || !this.bloomDataValid
                    || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
                    return true;
                }
    
                BitsArray bitsArray = null;
                try {
                    bitsArray = BitsArray.create(filterBitMap);
                    boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
                    log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
                    return ret;
                } catch (Throwable e) {
                    log.error("bloom filter error, sub=" + subscriptionData
                        + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
                }
            }
    
            return true;
        }
    }
    
    • ExpressionType.isTagType(subscriptionData.getExpressionType())针对Tag类型比较。
    • subscriptionData.getCodeSet().contains(tagsCode.intValue())执行的比较匹配逻辑。

    consumer的tags匹配逻辑

    public class PullAPIWrapper {
        private final InternalLogger log = ClientLogger.getLog();
        private final MQClientInstance mQClientFactory;
        private final String consumerGroup;
        private final boolean unitMode;
        private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
            new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
        private volatile boolean connectBrokerByUser = false;
        private volatile long defaultBrokerId = MixAll.MASTER_ID;
        private Random random = new Random(System.currentTimeMillis());
        private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    
        public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {
            this.mQClientFactory = mQClientFactory;
            this.consumerGroup = consumerGroup;
            this.unitMode = unitMode;
        }
    
        public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
            final SubscriptionData subscriptionData) {
            PullResultExt pullResultExt = (PullResultExt) pullResult;
    
            this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
            if (PullStatus.FOUND == pullResult.getPullStatus()) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
                List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
    
                List<MessageExt> msgListFilterAgain = msgList;
                if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                    for (MessageExt msg : msgList) {
                        // 针对msg的tags信息和subscriptionData的tags信息进行比较
                        if (msg.getTags() != null) {
                            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                                msgListFilterAgain.add(msg);
                            }
                        }
                    }
                }
    
                if (this.hasHook()) {
                    FilterMessageContext filterMessageContext = new FilterMessageContext();
                    filterMessageContext.setUnitMode(unitMode);
                    filterMessageContext.setMsgList(msgListFilterAgain);
                    this.executeHook(filterMessageContext);
                }
    
                for (MessageExt msg : msgListFilterAgain) {
                    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (Boolean.parseBoolean(traFlag)) {
                        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
                    }
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                        Long.toString(pullResult.getMinOffset()));
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                        Long.toString(pullResult.getMaxOffset()));
                }
    
                pullResultExt.setMsgFoundList(msgListFilterAgain);
            }
    
            pullResultExt.setMessageBinary(null);
    
            return pullResult;
        }
    }
    
    • consumer侧在PullAPIWrapper#processPullResult处理消息过滤处理。
    • 针对subscriptionData.getTagsSet().contains(msg.getTags())来过滤符合订阅tags信息的消息。

    相关文章

      网友评论

        本文标题:RocketMq 消息Tag过滤

        本文链接:https://www.haomeiwen.com/subject/eklcnhtx.html