美文网首页
NameServer架构设计--源码解读

NameServer架构设计--源码解读

作者: hei禹 | 来源:发表于2020-01-28 17:14 被阅读0次

    Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这么设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

    上面这段话是截取自《RocketMQ技术内幕》2.1节:NameServer架构设计,主要叙述了NameServer主要的架构设计和实现思路。本文就上面这段话进行拆解,一一进行源码解读。

    1、Broker消息服务器在启动时向所有NameServer注册

    // BrokerController#start
    public void start() throws Exception {
        
        // ...
        
        this.registerBrokerAll(true, false, true);
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
        // ...
    }
    

    Broker在启动时会调用registerBrokerAll函数,该函数实际会向NameServer发送REGISTER_BROKER请求,进行注册,注册的内容有哪些呢?

    {
        "dataVersion":{
            "counter":2,
            "timestamp":1579838252574
        },
        "topicConfigTable":{
            "SELF_TEST_TOPIC":Object{...},
            "DefaultCluster":Object{...},
            "RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
            "DESKTOP-FJIT15L":Object{...},
            "TBW102":{
                "order":false,
                "perm":7,
                "readQueueNums":8,
                "topicFilterType":"SINGLE_TAG",
                "topicName":"TBW102",
                "topicSysFlag":0,
                "writeQueueNums":8
            },
            "BenchmarkTest":Object{...},
            "OFFSET_MOVED_EVENT":Object{...}
        }
    }
    

    除此之外,还有brokerAddr、brokerId、brokerName、clusterName等。

    在Broker启动时,还会启动一个定时任务,进行定时注册上报,默认30s执行一次。

    向所有NameServer注册体现在,通过一个for循环,遍历所有NameServer,向其注册:

    // BrokerOuterAPI#registerBrokerAll
    public List<RegisterBrokerResult> registerBrokerAll(...) {
        
        // ...
    
        // 来自启动配置
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    // ...
                    
                    RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    
                    // ...
                }
            });
        }
    
        // ...
    }
    

    2、NameServer与每台Broker服务器保持长连接

    看Broker端的代码

    // BrokerController#start
    public void start() throws Exception {
        
        // ...
        
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        
        // ...
    }
    

    start函数实际是Netty的初始化:NettyRemotingClient#start。而实际的网络连接,则进行了本地缓存,如果连接已存在且可以,复用之,否则会重新创建新的连接:

    // NettyRemotingClient#getAndCreateChannel
    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
        if (null == addr) {
            return getAndCreateNameserverChannel();
        }
    
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
    
        return this.createChannel(addr);
    }
    
    // NettyRemotingClient#createChannel
    private Channel createChannel(final String addr) throws InterruptedException {
        // ...
        
        if (createNewConnection) {
            ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
            
            cw = new ChannelWrapper(channelFuture);
            this.channelTables.put(addr, cw);
        }
         
        // ...
    }
    

    channelTables的key是server地址(ip:port),因此对于多个NameServer,就有多个KV对。

    3、并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者

    // NamesrvController#initialize
    public boolean initialize() {
    
        // ...
    
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
    
        // ...
    }
    

    书中“并间隔30s检测Broker是否存活”的描述并不准确,Broker定时注册的间隔是30s,而NameServer定时检测Broker是否宕机的间隔是10s。

    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME /* 120秒 */) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
    

    NameServer每隔10秒扫描brokerLiveTable,如果Broker的上次更新时间距今超过120秒,则认为Broker失效,移除该Broker,删除与该Broker相关的路由信息,关闭与Broker连接。

    Broker的上次更新时间是什么时候更新的?答案是,Broker有一个30秒的定时注册逻辑,NameServer在收到请求后,会更新这个时间戳(RouteInfoManager#registerBroker)。

    关于"路由变化不会马上通知消息生产者"的描述,首先,NameServer检测到Broker宕机是有延迟的,连续120s没收到Broker的心跳,才会认为异常进行移除。其次,NameServer检测到宕机后不会通知给Producer,需要依靠Producer自身的定时任务去更新topic的路由信息,这个间隔是30s。也就是说,在这个间隔内,Produer是不知道Broker已经宕机了。因此需要在发送时,有额外的逻辑保障发送的高可用。

    // MQClientInstance#startScheduledTask
    private void startScheduledTask() {
        // ...
        
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    
        // ...
    }
    

    4、消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送

    5、在消息发送端提供容错机制来保证消息发送的高可用性

    Broker选择、消息队列选择、容错发送,我们合一起讲,用example.quickstart.Producer进行分析。

    // DefaultMQProducerImpl#sendDefaultImpl
    private SendResult sendDefaultImpl(...) {
        // ...
        
        // 获取到topic路由信息,才能执行下一步发送
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            // ...
            
            // 重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            MessageQueue mq = null;
            for (; times < timesTotal; times++) {
                
                // ...
                
                // 选择队列
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                mq = mqSelected;
    
                // ...
                
                // 发送
                this.sendKernelImpl(msg, mq, ...);
                
                // ...
             }
        }
        
        // ...
    }
    

    大致的流程是:

    1、获取topic的路由信息

    2、采用重试机制,for循环执行

    • 选择消息队列

    • 发送消息

    获取topic路由信息:在发送消息之前,需要知道要发送到哪个Broker,这部分逻辑都封装在tryToFindTopicPublishInfo,具体的分析可以参考RocketMQ自动创建topic,或者《RocketMQ技术内幕》3.4.2节。

    获取到的路由信息数据结构如下:


    topicPublisInfo数据结构.png

    再看下messageQueueList的数据是什么样的。由MQClientInstance#topicRouteData2TopicPublishInfo的逻辑,我们知道messageQueueList中的元素是按brokerName + queueId排序的,举例说明:

    [
        {
            "brokerName":"broker-a",
            "queueId":0
        },
        {
            "brokerName":"broker-a",
            "queueId":1
        },
        {
            "brokerName":"broker-b",
            "queueId":0
        },
        {
            "brokerName":"broker-b",
            "queueId":1
        }
    ]
    

    发到哪个Broker确定后,需要确定发到哪个队列。

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // 触发重试时,lastBrokerName就不为null
            int index = this.sendWhichQueue.getAndIncrement();
            
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                // pos指针往后移
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 取brokerName不相同的
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
    
    public MessageQueue selectOneMessageQueue() {
        // 第1次时,会随机生成一个index,后面发送一次就+1
        int index = this.sendWhichQueue.getAndIncrement();
        // 取模,分配到其中一个队列中
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
    
    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            // 随机数
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
    
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;
    
        this.threadLocalIndex.set(index);
        return index;
    }
    

    对于消息发送成功的:第1次发送时,会先随机生成一个index,用index%messageQueueList.size(),确定落到哪个队列。后续消息index会累加,消息就发送到别的队列去了。

    对于消息发送失败的:重试时,会从messageQueueList依次往后取brokerName跟之前不一样的那个队列。如果只有1个Broker,那最终发送的还是之前那个Broker。

    测试发送4条消息,消息队列共4个,发送情况如下:


    消息队列选择.png

    总结一下就是:如果Broker宕机,如果Producer发送前从NameServer获取到了最新的topic路由信息,那么发送不会有问题;如果发送时,topic路由信息还是包含了宕机的Broker,那会触发重试机制,发送时选择不同的Broker再去发送。

    在消息队列选择中,还有另外一种机制:Broker故障延迟机制,sendLatencyFaultEnable默认时不开启的。它主要是在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。

    相关文章

      网友评论

          本文标题:NameServer架构设计--源码解读

          本文链接:https://www.haomeiwen.com/subject/phhmthtx.html