之前看rocketmq,然后在想一个问题,就是一主一从的集群结构中,如果master宕机了,consumer这边是怎么选择的,按照官方说明中,master挂了,但是slave得消息仍然可以被消费到。原因是master和slave是一直有连接的,所以master上面的消息是可以及时同步到slave,但是终究是有一部分留在master(异步复制)的时候。
那么问题来了,consumer是怎么从master上面切换到slave上继续消费消息呢?首先明确一点,master宕机,就意味着这个broker不再写入,但是因为slave还在,所以还可以继续读。所以我们看一下consumer是怎么选择的?
从Pullconsumer 进去,调用pullBlockIfNotFound方法,一直进去,最后到:DefaultMQPullConsumerImpl的pullSyncImpl 方法,最后看到这段:
PullResult pullResult =this.pullAPIWrapper.pullKernelImpl(mq,suçbscriptionData.getSubString(),0L,offset,maxNums,sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),timeoutMillis,CommunicationMode.SYNC,null);
进去看一下实现:
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq),false);
其实这段也很好理解,就是先根据订阅的关系找到broker。是根据broker的名字和id共同起作用的。broker的名字肯定能够拿到一堆broker的id,一般都是一主多从,那这个怎么选呢?
进去看看就知道了:
HashMap map =this.brokerAddrTable.get(brokerName);
if(map !=null&& !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr !=null;
if(!found && !onlyThisBroker) {
Entry entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = entry.getKey() != MixAll.MASTER_ID;
found =true;
}
}
先拿到broker的缓存,其实就是存在本地的hashmap,然后根据broker的id查找,如果找到了,判断下是不是slave角色返回,找不到的情况下就根据那拿到的列表迭代一个出来,考虑到时无序的,所以就可以理解为随机拿一个出来了,再判定角色。
所以传进来的brokerId非常重要,如果这台机器没有宕机的情况下,就是返回这个broker的地址了,否则就是从剩下的机器进行随机一个。
那传进去的brokerId是怎么产生的呢?
public longrecalculatePullFromWhichNode(finalMessageQueue mq) {
if(this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
AtomicLong suggest =this.pullFromWhichNodeTable.get(mq);
if(suggest !=null) {
returnsuggest.get();
}
returnMixAll.MASTER_ID;
}
传入的mq是负载均衡服务分配给当前consumer消费的队列,它必然是属于一个brokername唯一的拓扑结构中,即一主多从的几台机器中,从哪个机器选就很重要了,因为都可以选的。
1.先看isConnectBrokerByUser 是否设置,如果设置,返回默认的,即0.
2.看缓存中是否已经存了建议值,如果存了,直接返回
3.返回master的,即0.
那到这里,我们看一下怎么解释一下宕机时主从切换过程,consumer时如何从主上面切换到从上面的?
1.一开始时正常的,因为没有缓存,也没有特别设置,所以,进入3 返回master。
2.master 写入缓存,后面都读取到缓存,在上面的步骤2中返回。
master宕机了,然后nameserver中不再收到心跳,然后master机器剔除掉。所以consumer虽然选到了master,但是因为在地址中找不到broker'id=0的数据,于是进入随机过程,然后这样就切到了slave,然后slave写入到缓存。后面一直读到缓存中的slave。
问题来了:master起来后,建议值的缓存也没有更新,那怎么切回到master,毕竟我们是因为宕机产生地址找不到的时候,才能完成切换的,这解释不通。
后来debug了一发,发现这个suggest值并不是consumer端决定的,而是broker决定的。啥意思,即使你是拉去slave上面的数据,slave上面返回的结果中的suggest值也可能是0,然后0就写进缓存中,下一次,你还是优先访问master,然后master没有地址,访问slave。
这个意思就是,consumer端是根据缓存中的suggest值优先选机器。但是呢这个suggest是通过broker传回来的。所以即使是访问slave,传回来的suggest值仍然是master,只不过客户端没有master的映射关系,所以继续访问slave。这就能解释为什么master从宕机起来后,consumer能够切回master,因为地址映射表得到更新了,nameserver中有了master的信息了。
那么重点来了:
什么时候,broker返回的建议值是0?
什么时候,broker返回的建议值是其他值?
1.如果master中的堆积信息过多,默认返回consumerslow配置,默认是1.(所以机器的brokerId真的不能乱用),这个时候就切到slave了。
2.
未完待续~
网友评论