为什么会有NameServer
消息中间件一般基于主题的订阅发布机制,消息生产者会发送某一主体(Topic)的消息到消息服务器(Broker),消息服务器负责该消息的持久化存储,消息消费者订阅感兴趣的主题。通常情况下,为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息生产者如何 知道消息要发往哪台消息服务器呢?如果某一台消息服务器宕机了,那么生产者如何在不重启服务的情况下感知。NameServer可以 解决上述问题。
Broker消息服务器在启动的时候向所有NameServer注册,消息生产者在发送消息之前先从NameServer获取Broker服务器地址列表,然后 根据负载均衡算法从列表中选择一台消息服务器进行消息发送,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息生产者。
NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同 。
NameServer作用及重要变量
NameServer存储路由的基础信息,还能够管理Broker节点,包括路由注册、路由删除等功能。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
从上面可以看出数据类型都是HashMap, 其中,QueueData记录的是消息队列的信息。
public class QueueData implements Comparable<QueueData> {
private String brokerName;
# 读队列数量
private int readQueueNums;
# 写队列数量
private int writeQueueNums;
# 读写权限,具体含义参考PermName
private int perm;
# topic同步标记,具体含义参考TopicSysFlag
private int topicSynFlag;
- 记录集群信息,存储集群中所有Broker名称
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
- Broker状态信息
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
- topicQueueTable: Topic消息队列路由信息 ,消息发送时根据路由表进行负载均衡。
- brokerAddrTable: Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
- clusterAddrTable: Broker状态信息,存储集群中所有Broker名称。
- brokerLiveTable: Broker状态信息。NameServer每次收到心跳包时会替换该信息。
- filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。
路由注册
Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。
NameServer处理心跳包
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName); boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result; }
- 路由注册需要加写锁,防止并发修改RouteInfoManager中的路由表。首先判断Broker所属集群是否存在,如果不存在,则创建,然后将broker名加入到集群Broker集合中。
- 维护BrokerData信息,首先从brokerAddrTable根据BrokerName尝试获取Broker信息,如果不存在,则新建BrokerData并放入到brokerAddrTable,registerFirst设置为true;如果存在,直接替换原来的,registerFirst设置为false,表示非第一次注册。
- 如果Broker为Master,并且Broker Topic配置信息发生变化或者是初次注册,则需要创建或更新Topic路由元数据,填充topicQueueTable,其实就是为默认主题自动注册路由信息。根据TopicConfig创建QueueData的数据结构,然后更新topicQueueTable。
- 更新BrokerLiveInfo,存活Broker信息表,BrokerLiveInfo是执行路由删除的重要依据。
- 注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器。
路由删除
NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
网友评论