美文网首页rocketMq理论与实践
RocketMq 重置消费位点逻辑

RocketMq 重置消费位点逻辑

作者: 晴天哥_王志 | 来源:发表于2020-06-21 23:54 被阅读0次

    系列

    开篇

    • 这篇文章的主要目的是分析RocketMq根据时间戳重置某topic在某consumeGroup下的消费位点。

    • 重置位点的执行顺序按照admin 到 broker 到 consumer的顺序依次触发,admin负责构建参数通知broker,broker负责查询consumeQueue的具体位移,broker负责通知consumer进行位移重置。

    • 根据时间戳查找consumeQueue对应的位移,然后由broker通知consumer来持久化消费位移,最终会持久化到broker的消费位移。

    • 重置位点操作本质上是在client端执行,consumer端负责持久化新的消费位移然后由定时任务通知broker更新消费位移。

    • consumer在整个位移重置过程中会设置ProcessQueue的状态为Dropped,从而阻断消息拉取任务ConsumeRequest的执行阻断消息拉取,其次会在consumer侧修改消费位移通过心跳通知broker修改consumer的消费位移,最后通过重新的rebalance过程开始重新消费消息。

    重置命令

    public class ResetOffsetByTimeCommand implements SubCommand {
    
        @Override
        public String commandName() {
            return "resetOffsetByTime";
        }
    
        @Override
        public Options buildCommandlineOptions(Options options) {
            Option opt = new Option("g", "group", true, "set the consumer group");
            opt.setRequired(true);
            options.addOption(opt);
    
            opt = new Option("t", "topic", true, "set the topic");
            opt.setRequired(true);
            options.addOption(opt);
    
            opt = new Option("s", "timestamp", true, "set the timestamp[now|currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
            opt.setRequired(true);
            options.addOption(opt);
    
            opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]");
            opt.setRequired(false);
            options.addOption(opt);
    
            opt = new Option("c", "cplus", false, "reset c++ client offset");
            opt.setRequired(false);
            options.addOption(opt);
            return options;
        }
    
        @Override
        public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
            defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
            try {
                String group = commandLine.getOptionValue("g").trim();
                String topic = commandLine.getOptionValue("t").trim();
                String timeStampStr = commandLine.getOptionValue("s").trim();
                long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : 0;
    
                try {
                    if (timestamp == 0) {
                        timestamp = Long.parseLong(timeStampStr);
                    }
                } catch (NumberFormatException e) {
    
                    timestamp = UtilAll.parseDate(timeStampStr, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
                }
    
                boolean force = true;
                if (commandLine.hasOption('f')) {
                    force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
                }
    
                boolean isC = false;
                if (commandLine.hasOption('c')) {
                    isC = true;
                }
    
                defaultMQAdminExt.start();
                Map<MessageQueue, Long> offsetTable;
                try {
                    // 通过defaultMQAdminExt#resetOffsetByTimestamp来执行
                    offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
                } catch (MQClientException e) {
                    if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
                        ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr);
                        return;
                    }
                    throw e;
                }
            } catch (Exception e) {
                throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
            } finally {
                defaultMQAdminExt.shutdown();
            }
        }
    }
    
    • ResetOffsetByTimeCommand的命令格式如上代码所示,核心调用DefaultMQAdminExt#resetOffsetByTimestamp来重置。

    admin

    public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
    
        public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
            boolean isC)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            // 通过defaultMQAdminExtImpl#resetOffsetByTimestamp重置位移
            return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
        }
    }
    
    
    public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
    
        public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
            boolean isC)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    
            // 1、获取topic对应的TopicRouteData信息,进而获取对应的BrokerData信息
            TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
            List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
            Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
            if (brokerDatas != null) {
                // 2、遍历topic所在的broker节点信息
                for (BrokerData brokerData : brokerDatas) {
                    String addr = brokerData.selectBrokerAddr();
                    if (addr != null) {
                        // 2、远程调用broker来执行重置操作
                        Map<MessageQueue, Long> offsetTable =
                            this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                                timeoutMillis, isC);
                        if (offsetTable != null) {
                            allOffsetTable.putAll(offsetTable);
                        }
                    }
                }
            }
            return allOffsetTable;
        }
    }
    
    • 获取topic对应的TopicRouteData信息,进而获取对应的BrokerData信息。
    • 遍历topic所在的broker节点信息,远程调用broker来执行重置操作。
    public class MQClientAPIImpl {
    
        public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
            final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
            throws RemotingException, MQClientException, InterruptedException {
            ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
            requestHeader.setTopic(topic);
            requestHeader.setGroup(group);
            requestHeader.setTimestamp(timestamp);
            requestHeader.setForce(isForce);
    
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
            if (isC) {
                request.setLanguage(LanguageCode.CPP);
            }
    
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    if (response.getBody() != null) {
                        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                        return body.getOffsetTable();
                    }
                }
                default:
                    break;
            }
    
            throw new MQClientException(response.getCode(), response.getRemark());
        }
    }
    
    • 远程调用的RequestCode为INVOKE_BROKER_TO_RESET_OFFSET。
    • 通过RequestCode可以方便查找BrokerProcessor便于分析。

    broker

    public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
        public RemotingCommand resetOffset(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            // 解析requestHeader的信息
            final ResetOffsetRequestHeader requestHeader =
                (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
    
            boolean isC = false;
            LanguageCode language = request.getLanguage();
            switch (language) {
                case CPP:
                    isC = true;
                    break;
            }
            // 通过Broker2Client#resetOffset重置位移
            return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
                requestHeader.getTimestamp(), requestHeader.isForce(), isC);
        }
    }
    
    public class Broker2Client {
    
        public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
                                           boolean isC) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            // 获取TopicConfig相关信息
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
                return response;
            }
    
            Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
            // 遍历所有的写队列并获取每个MessageQueue的消费位移
            for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
                MessageQueue mq = new MessageQueue();
                mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                mq.setTopic(topic);
                mq.setQueueId(i);
                // 查询每个messageQueue查询对应的consumeQueue的消费位移consumerOffset
                long consumerOffset =
                    this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
                if (-1 == consumerOffset) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark(String.format("THe consumer group <%s> not exist", group));
                    return response;
                }
                
                // 根据时间戳查询consumeQueue的位移
                long timeStampOffset;
                if (timeStamp == -1) {
                    // 没时间戳就获取consumeQueue的最大位移
                    timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                } else {
                    // 根据时间戳查找consumeQueue的最大位移
                    timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
                }
    
                if (timeStampOffset < 0) {
                    log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
                    timeStampOffset = 0;
                }
    
                if (isForce || timeStampOffset < consumerOffset) {
                    offsetTable.put(mq, timeStampOffset);
                } else {
                    offsetTable.put(mq, consumerOffset);
                }
            }
            // RequestCode 为 RESET_CONSUMER_CLIENT_OFFSET
            ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
            requestHeader.setTopic(topic);
            requestHeader.setGroup(group);
            requestHeader.setTimestamp(timeStamp);
            RemotingCommand request =
                RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
            if (isC) {
                // c++ language
                ResetOffsetBodyForC body = new ResetOffsetBodyForC();
                List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
                body.setOffsetTable(offsetList);
                request.setBody(body.encode());
            } else {
                // other language
                ResetOffsetBody body = new ResetOffsetBody();
                body.setOffsetTable(offsetTable);
                request.setBody(body.encode());
            }
            // 获取consumeGroup的所有consumer信息并通知位移重置
            ConsumerGroupInfo consumerGroupInfo =
                this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
    
            if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
                ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                    consumerGroupInfo.getChannelInfoTable();
                for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
                    int version = entry.getValue().getVersion();
                    if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                        try {
                            this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                        } catch (Exception e) {
                        }
                    } else {
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("the client does not support this feature. version="
                            + MQVersion.getVersionDesc(version));
                        
                        return response;
                    }
                }
            } else {
                String errorInfo =
                    String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                        requestHeader.getGroup(),
                        requestHeader.getTopic(),
                        requestHeader.getTimestamp());
                log.error(errorInfo);
                response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
                response.setRemark(errorInfo);
                return response;
            }
            response.setCode(ResponseCode.SUCCESS);
            ResetOffsetBody resBody = new ResetOffsetBody();
            resBody.setOffsetTable(offsetTable);
            response.setBody(resBody.encode());
            return response;
        }
    }
    
    • Broker2Client#resetOffset负责执行broker侧的消费位点的流程。
    • 获取topic对应TopicConfig,进而获取topic的所有写队列,
    • 遍历所有的写队列,结合consumeGroup和topic获取该topic在consumeGroup下所有的队列的根据时间戳查找的消费位移。
    • 获取consumeGroup下所有的client端信息,依次调用client通知消费位移
    public class DefaultMessageStore implements MessageStore {
    
        public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
            // 先根据topic和queueId查找对应的ConsumeQueue对象
            ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
            if (logic != null) {
                // 针对ConsumeQueue对象根据时间戳进行查找返回对应的位移
                return logic.getOffsetInQueueByTime(timestamp);
            }
    
            return 0;
        }
    
        public ConsumeQueue findConsumeQueue(String topic, int queueId) {
            ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
            if (null == map) {
                ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
                ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
                if (oldMap != null) {
                    map = oldMap;
                } else {
                    map = newMap;
                }
            }
    
            ConsumeQueue logic = map.get(queueId);
            if (null == logic) {
                ConsumeQueue newLogic = new ConsumeQueue(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                    this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                    this);
                ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
                if (oldLogic != null) {
                    logic = oldLogic;
                } else {
                    logic = newLogic;
                }
            }
    
            return logic;
        }
    }
    
    • 根据topic和queueId查找对应的ConsumeQueue对象。
    public class ConsumeQueue {
    
        public long getOffsetInQueueByTime(final long timestamp) {
            // 根据timestamp查找ConsumeQueue,文件最后更新时间戳大于该timestamp的文件
            MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
            if (mappedFile != null) {
                long offset = 0;
                int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
                int high = 0;
                int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
                long leftIndexValue = -1L, rightIndexValue = -1L;
    
                // 获取commitLog的最小物理偏移量
                long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
                SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
                if (null != sbr) {
                    ByteBuffer byteBuffer = sbr.getByteBuffer();
                    high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                    try {
                        // 二分查找确定时间戳对应的位移
                        while (high >= low) {
                            // 二分查找的midOffset及对应的commitLog的phyOffset
                            midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                            byteBuffer.position(midOffset);
                            long phyOffset = byteBuffer.getLong();
                            int size = byteBuffer.getInt();
                            // 比较二分查找中间值的位移和minPhysicOffset进行比较
                            // 如果中间值偏小那么就从中间值往右进行查找
                            if (phyOffset < minPhysicOffset) {
                                low = midOffset + CQ_STORE_UNIT_SIZE;
                                leftOffset = midOffset;
                                continue;
                            }
                            // 根据consumeQueue的记录的commitLog的消息位移和文件大小查找对应的消息存储时间
                            long storeTime =
                                this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                            // 判断storeTime的时间和timestamp进行下比较
                            if (storeTime < 0) {
                                return 0;
                            } else if (storeTime == timestamp) {
                                // 查找到目的commitLog的位移
                                targetOffset = midOffset;
                                break;
                            } else if (storeTime > timestamp) {
                                // 查找到消息的存储时间比当前查找的timestamp大
                                high = midOffset - CQ_STORE_UNIT_SIZE;
                                rightOffset = midOffset;
                                rightIndexValue = storeTime;
                            } else {
                                // 查找到消息的存储时间比当前查找的timestamp小
                                low = midOffset + CQ_STORE_UNIT_SIZE;
                                leftOffset = midOffset;
                                leftIndexValue = storeTime;
                            }
                        }
    
                        if (targetOffset != -1) {
                            offset = targetOffset;
                        } else {
                            if (leftIndexValue == -1) {
                                offset = rightOffset;
                            } else if (rightIndexValue == -1) {
                                offset = leftOffset;
                            } else {
                                offset =
                                    Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                        - rightIndexValue) ? rightOffset : leftOffset;
                            }
                        }
    
                        return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                    } finally {
                        sbr.release();
                    }
                }
            }
            return 0;
        }
    }
    
    • 通过二分查找方法来定位重置时间戳对应的consumeQueue的位移。

    consumer

    public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
        public RemotingCommand resetOffset(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            // 1、解析ResetOffsetRequestHeader对象
            final ResetOffsetRequestHeader requestHeader =
                (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
            // 2、解析重置的offsetTable
            Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
            if (request.getBody() != null) {
                ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
                offsetTable = body.getOffsetTable();
            }
    
            // 3、通过offsetTable来重置client的消费位移
            this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
            return null;
        }
    }
    
    • ClientRemotingProcessor#resetOffset负责执行broker通知client的重置位移命令。
    public class MQClientInstance {
    
        public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
            DefaultMQPushConsumerImpl consumer = null;
            try {
                MQConsumerInner impl = this.consumerTable.get(group);
                if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
                    consumer = (DefaultMQPushConsumerImpl) impl;
                } else {
                    log.info("[reset-offset] consumer dose not exist. group={}", group);
                    return;
                }
                consumer.suspend();
    
                ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
                for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                    MessageQueue mq = entry.getKey();
                    // 重置所有的ProcessQueue标记为dropped
                    if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                        ProcessQueue pq = entry.getValue();
                        pq.setDropped(true);
                        pq.clear();
                    }
                }
    
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                }
    
                Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
                while (iterator.hasNext()) {
                    MessageQueue mq = iterator.next();
                    Long offset = offsetTable.get(mq);
                    if (topic.equals(mq.getTopic()) && offset != null) {
                        try {
                            consumer.updateConsumeOffset(mq, offset);
                            consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                            iterator.remove();
                        } catch (Exception e) {
                            log.warn("reset offset failed. group={}, {}", group, mq, e);
                        }
                    }
                }
            } finally {
                if (consumer != null) {
                    consumer.resume();
                }
            }
        }
    }
    
    • consumer会匹配所有重置位移的MessageQueue,然后设置对应的ProcessQueue的状态为Dropped。
    • consumer会针对所有的MessageQueue持久化对应的重置消费位移。
    • consumer会移除processQueueTable所有相关的MessageQueue对象。
    • ConsumeRequest在ProcessQueue的状态为Dropped会停止拉取消息。
    • consumer在定时rebalance过程中会重新生成ConsumeRequest并重新开始拉取消息。

    相关文章

      网友评论

        本文标题:RocketMq 重置消费位点逻辑

        本文链接:https://www.haomeiwen.com/subject/exuoxktx.html