美文网首页
(十六)主从与读写分离

(十六)主从与读写分离

作者: guessguess | 来源:发表于2021-08-26 16:53 被阅读0次

    先前思考了一个问题,rocketmq的主从是如何实现的。
    最后翻了一下源码,做一个简单的记录。

    当broker中master宕机之后,消费者为什么还可以从slave中获取消息?
    在前面得知,消费者消费前,会进行负载均衡,进行对队列的重新分配。
    随后再根据分配的队列,通过队列的brokername找到对应的broker集群。
    最后再进行消息拉取。
    那么这个Broker集群中的master出现宕机,消费者得首先解决消息拉取的问题,才可以继续消费队列。

    由于一个m对应多个s,m与s所存储的消息内容都是一致的,master存储消息的同时会与slave进行同步。
    所以这里当m宕机之后,消费者可以从任意一个s中继续拉取消息,从而继续完成消费。

    那么实现是怎么样的?代码比较简单

    public class PullAPIWrapper {
        public PullResult pullKernelImpl(
            final MessageQueue mq,
            final String subExpression,
            final String expressionType,
            final long subVersion,
            final long offset,
            final int maxNums,
            final int sysFlag,
            final long commitOffset,
            final long brokerSuspendMaxTimeMillis,
            final long timeoutMillis,
            final CommunicationMode communicationMode,
            final PullCallback pullCallback
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            找到最终拉取消息的地址
            FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
            if (null == findBrokerResult) {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
                findBrokerResult =
                    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                        this.recalculatePullFromWhichNode(mq), false);
            }
    
            if (findBrokerResult != null) {
                {
                    // check version
                    if (!ExpressionType.isTagType(expressionType)
                        && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                        throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                            + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                    }
                }
                int sysFlagInner = sysFlag;
    
                if (findBrokerResult.isSlave()) {
                    sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
                }
    
                PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
                requestHeader.setConsumerGroup(this.consumerGroup);
                requestHeader.setTopic(mq.getTopic());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setQueueOffset(offset);
                requestHeader.setMaxMsgNums(maxNums);
                requestHeader.setSysFlag(sysFlagInner);
                requestHeader.setCommitOffset(commitOffset);
                requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
                requestHeader.setSubscription(subExpression);
                requestHeader.setSubVersion(subVersion);
                requestHeader.setExpressionType(expressionType);
    
                String brokerAddr = findBrokerResult.getBrokerAddr();
                if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                    brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
                }
    
                PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                    brokerAddr,
                    requestHeader,
                    timeoutMillis,
                    communicationMode,
                    pullCallback);
    
                return pullResult;
            }
    
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    
    }
    

    从上面的代码看,关键点还是在于如何找到地址。

    查找地址

    public class MQClientInstance {
        public FindBrokerResult findBrokerAddressInSubscribe(
            final String brokerName,
            final long brokerId,
            final boolean onlyThisBroker
        ) {
            String brokerAddr = null;
            boolean slave = false;
            boolean found = false;
            根据broker名字找到各个brokerid对应的ip地址映射
            HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
            if (map != null && !map.isEmpty()) {
                查找Brokerid对应的地址
                brokerAddr = map.get(brokerId);
                brokerid不为0,说明为slave
                slave = brokerId != MixAll.MASTER_ID;
                地址为空,说明没找到
                found = brokerAddr != null;
                如果没找到对应地址,且brokerid为slave,则寻找其他slave
                if (!found && slave) {
                    brokerAddr = map.get(brokerId + 1);
                    found = brokerAddr != null;
                }
                如果没找到,且可以是其他broker,则用其他broker即可
                if (!found && !onlyThisBroker) {
                    Entry<Long, String> entry = map.entrySet().iterator().next();
                    brokerAddr = entry.getValue();
                    slave = entry.getKey() != MixAll.MASTER_ID;
                    found = true;
                }
            }
            找到则返回最终的实际地址
            if (found) {
                return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
            }
    
            return null;
        }
    
    }
    

    首先,如果brokerid=0(为masterid)的服务出现宕机,那么brokerAddrTable中的map也会移除Brokerid=0这个键值对(注册中心若一段时间没有收到心跳,就会清理过期的broker,其他broker会每隔一段时间与注册中心同步路由信息)。
    当brokerAddrTable没有brokerid=0的键值对的时候,就会考虑获取其他broker(slave)的ip地址。
    所以主从功能就这么完成了,说白了就是当brokerid=0不可用时,考虑从slave中获取,也可以拉取到消息,完成消费。

    读写分离

    在发送消息的时候,brokerid的传入并不是写死的。
    而是通过一个函数this.recalculatePullFromWhichNode(mq)获取。
    下面先来看看这个函数的实现

        public long recalculatePullFromWhichNode(final MessageQueue mq) {
            if (this.isConnectBrokerByUser()) {
                return this.defaultBrokerId;
            }
    
            AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
            if (suggest != null) {
                return suggest.get();
            }
    
            return MixAll.MASTER_ID;
        }
    

    代码比较简单,默认就是根据pullFromWhichNodeTable获取对应的brokerid。否则默认就是masterid=0;

    这里可以理解成,rocketmq并不是每次都是master中拉取数据的,而是根据pullFromWhichNodeTable来获取对应的brokerid。当pullFromWhichNodeTable获取不到对应的数据时,就默认使用masterid。向brokerid=0的Broker拉取数据。

    pullFromWhichNodeTable的生成

    通过定位,pullFromWhichNodeTable的更新以及生成,其实是在向某个broker拉取完消息后,该broker建议下次拉取的brokerid。
    先来看看生成的方法

    public class PullAPIWrapper {
        public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
            AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
            if (null == suggest) {
                this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
            } else {
                suggest.set(brokerId);
            }
        }
    }
    

    调用的时机,向broker拉取完消息后,进行更新。

    public class PullAPIWrapper {
        public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
            final SubscriptionData subscriptionData) {
            PullResultExt pullResultExt = (PullResultExt) pullResult;
    
            this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
            ........
        }
    }
    

    那么broker是如何生成这个SuggestWhichBrokerId?

    代码位置在于org.apache.rocketmq.broker.processor.PullMessageProcessor.processRequest(Channel, RemotingCommand, boolean)

    public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
        private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
            throws RemotingCommandException {
            省略代码........
            final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
            if (getMessageResult != null) {
                response.setRemark(getMessageResult.getStatus().name());
                responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
                responseHeader.setMinOffset(getMessageResult.getMinOffset());
                responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
                如果建议从slave拉取
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    //SuggestWhichBrokerId设置为1,subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()返回的值为1。
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                } else {
                    否则下次还是从master中拉取
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
         省略代码////
         }
    }
    

    从代码上看,Broker自己会根据情况,返回下次拉取的borkerid。如果建议从slave拉取,返回1,否则返回0.

    那么是如何判断是否需要从slave中拉取呢?

    public class DefaultMessageStore implements MessageStore {
        public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
            final int maxMsgNums,
            final MessageFilter messageFilter) {
            省略若干代码
                            long diff = maxOffsetPy - maxPhyOffsetPulling;
                            long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                            getResult.setSuggestPullingFromSlave(diff > memory);
             省略若干代码
        }
    }
    

    这里的判断也是比较简单,说白了就是拉取处理的消息比消息常驻内存大小还要大的话,表示繁忙,所以就建议下次从slave拉取消息。

    相关文章

      网友评论

          本文标题:(十六)主从与读写分离

          本文链接:https://www.haomeiwen.com/subject/rbkuiltx.html