美文网首页
RocketMQ阅读笔记之路由中心NameServer

RocketMQ阅读笔记之路由中心NameServer

作者: 九点半的马拉 | 来源:发表于2019-10-31 20:35 被阅读0次

    为什么会有NameServer

    消息中间件一般基于主题的订阅发布机制,消息生产者会发送某一主体(Topic)的消息到消息服务器(Broker),消息服务器负责该消息的持久化存储,消息消费者订阅感兴趣的主题。通常情况下,为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息生产者如何 知道消息要发往哪台消息服务器呢?如果某一台消息服务器宕机了,那么生产者如何在不重启服务的情况下感知。NameServer可以 解决上述问题。

    Broker消息服务器在启动的时候向所有NameServer注册,消息生产者在发送消息之前先从NameServer获取Broker服务器地址列表,然后 根据负载均衡算法从列表中选择一台消息服务器进行消息发送,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息生产者。

    NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同 。

    NameServer作用及重要变量

    NameServer存储路由的基础信息,还能够管理Broker节点,包括路由注册、路由删除等功能。

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; 
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; 
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; 
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    从上面可以看出数据类型都是HashMap, 其中,QueueData记录的是消息队列的信息。

    public class QueueData implements Comparable<QueueData> {
        private String brokerName;
     # 读队列数量
     private int readQueueNums;
     # 写队列数量
     private int writeQueueNums;
     # 读写权限,具体含义参考PermName
     private int perm;
     # topic同步标记,具体含义参考TopicSysFlag
     private int topicSynFlag;
    
    • 记录集群信息,存储集群中所有Broker名称
    public class BrokerData implements Comparable<BrokerData> {
        private String cluster;
     private String brokerName;
     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    
    • Broker状态信息
    class BrokerLiveInfo {
      private long lastUpdateTimestamp;
     private DataVersion dataVersion;
     private Channel channel;
     private String haServerAddr;
    
    • topicQueueTable: Topic消息队列路由信息 ,消息发送时根据路由表进行负载均衡。
    • brokerAddrTable: Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
    • clusterAddrTable: Broker状态信息,存储集群中所有Broker名称。
    • brokerLiveTable: Broker状态信息。NameServer每次收到心跳包时会替换该信息。
    • filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。

    路由注册

    Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。

    NameServer处理心跳包

    public RegisterBrokerResult registerBroker(
        final String clusterName,
     final String brokerAddr,
     final String brokerName,
     final long brokerId,
     final String haServerAddr,
     final TopicConfigSerializeWrapper topicConfigWrapper,
     final List<String> filterServerList,
     final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
     try {
            try {
                this.lock.writeLock().lockInterruptibly();    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
     if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
     this.clusterAddrTable.put(clusterName, brokerNames);
      }
                brokerNames.add(brokerName);   boolean registerFirst = false;    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
     if (null == brokerData) {
                    registerFirst = true;
      brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
     this.brokerAddrTable.put(brokerName, brokerData);
      }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
      //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
     //The same IP:PORT must only have one record in brokerAddrTable  Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
     while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
     if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
      }
                }
    
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
      registerFirst = registerFirst || (null == oldAddr);   if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
     if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
      }
                        }
                    }
                }
    
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
     new BrokerLiveInfo(
                        System.currentTimeMillis(),
      topicConfigWrapper.getDataVersion(),
      channel,
      haServerAddr));
     if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
      }
    
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
      } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
      }
                }
    
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
     if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
     if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
      result.setMasterAddr(masterAddr);
      }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
      }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
      }
    
        return result; }
    
    • 路由注册需要加写锁,防止并发修改RouteInfoManager中的路由表。首先判断Broker所属集群是否存在,如果不存在,则创建,然后将broker名加入到集群Broker集合中。
    • 维护BrokerData信息,首先从brokerAddrTable根据BrokerName尝试获取Broker信息,如果不存在,则新建BrokerData并放入到brokerAddrTable,registerFirst设置为true;如果存在,直接替换原来的,registerFirst设置为false,表示非第一次注册。
    • 如果Broker为Master,并且Broker Topic配置信息发生变化或者是初次注册,则需要创建或更新Topic路由元数据,填充topicQueueTable,其实就是为默认主题自动注册路由信息。根据TopicConfig创建QueueData的数据结构,然后更新topicQueueTable。
    • 更新BrokerLiveInfo,存活Broker信息表,BrokerLiveInfo是执行路由删除的重要依据。
    • 注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器。

    路由删除

    NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

    相关文章

      网友评论

          本文标题:RocketMQ阅读笔记之路由中心NameServer

          本文链接:https://www.haomeiwen.com/subject/hqqxbctx.html