概述
zookeeper的数据结构与linux文件系统很类似,和文件系统不同的是,zk的数据存储是结构化存储,没有文件和目录的概念,文件和目录被抽象成了节点(DataNode),DataNode组成DataTree。本节就来看下DataTree/DataNode相关源码,为后续学习打好基础。
一、数据结构
数据模型1.1 DataTree
public class DataTree {
// path -> DataNode的映射关系
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
// 数据类型通知管理器
private final WatchManager dataWatches = new WatchManager();
// 子节点类通知管理器
private final WatchManager childWatches = new WatchManager();
// 客户端sessionId -> 该session创建的临时节点路径集合
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
// 权限缓存
private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();
}
DataTree是zk的内存数据结构,是有所有DataNode构成的树;
-
ConcurrentHashMap<String, DataNode> nodes
:节点路径到节点对象DataNode的映射关系,存放该Tree上所有Node; -
WatchManager dataWatches
:数据类型通知管理器 -
WatchManager childWatches
:子节点类通知管理器 -
Map<Long, HashSet<String>> ephemerals
:客户端sessionId到该session创建的临时节点路径集合的映射; -
ReferenceCountedACLCache aclCache
:权限缓存
1.2 DataNode
public class DataNode implements Record {
// 父节点
DataNode parent;
// 节点数据
byte data[];
// acl转换成long
Long acl;
// 节点描述信息
public StatPersisted stat;
// 子节点信息
private Set<String> children = null;
}
DataNode是节点在内存中具体数据结构,包含节点数据、父节点信息、子节点信息、节点权限、节点描述信息;
1.3 StatPersisted
public class StatPersisted implements Record {
// 节点创建id
private long czxid;
// 节点修改id
private long mzxid;
// 节点创建时间
private long ctime;
// 节点修改时间
private long mtime;
// 当前节点的数据版本号,初始版本为0,每对该节点的数据进行更改操作时加1
private int version;
// 当前节点的子节点版本号,初始值为0,每对该节点的子节点进行更改操作时加1
private int cversion;
// 当前节点的acl权限版本号,初始版本为0,对权限修改该时加1
private int aversion;
// 节点所有者 0标识anyone
private long ephemeralOwner;
//子节点修改时间
private long pzxid;
}
StatPersisted是节点详细的描述信息,基本跟get命令返回数据一致,
[zk: localhost:2181(CONNECTED) 0] get /zk
test
cZxid = 0x2
ctime = Sat Dec 05 15:54:12 CST 2020
mZxid = 0x2
mtime = Sat Dec 05 15:54:12 CST 2020
pZxid = 0x65
cversion = 8
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 8
1.4 WatchManager
public class WatchManager {
// 路径到watcher的映射关系
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
// watcher到路径的映射关系
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
}
WatchManager是服务器端Watch的注册表,跟客户端Watch注册表ZKWatchManage对应;
-
HashMap<String, HashSet<Watcher>> watchTable
:路径到watcher的映射关系,即该节点上注册了哪些监听者; -
HashMap<Watcher, HashSet<String>> watch2Paths
:watcher到路径的映射关系,即该监听者注册监听了哪些节点;
1.5 Watcher
public interface Watcher {
abstract public void process(WatchedEvent event);
}
Watcher监听者接口,只提供一个process监听处理方法;客户端可以通过回调把该方法实现注册到节点变更事件上;
1.6 ReferenceCountedACLCache
public class ReferenceCountedACLCache {
// Long类型acl -> acl list映射
final Map<Long, List<ACL>> longKeyMap = new HashMap<Long, List<ACL>>();
// acl list -> Long类型acl映射
final Map<List<ACL>, Long> aclKeyMap = new HashMap<List<ACL>, Long>();
// Long类型acl引用计数
final Map<Long, AtomicLongWithEquals> referenceCounter = new HashMap<Long, AtomicLongWithEquals>();
}
ReferenceCountedACLCache主要用于Long类型acl和acl list之间互相转换,并存储相互之间映射信息,以节省DataTree整个树形结构的内存占用;
二、重点方法分析
2.1 DataTree.createNode
public String createNode(String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
// 获取父路径
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
// 判断父节点是否存在
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
// 判断父节点下该子节点是否已经存在
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
//父节点cVersion + 1
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
// acl转换成long类型
Long longval = aclCache.convertAcls(acl);
// 创建DataNode
DataNode child = new DataNode(parent, data, longval, stat);
// dataNode添加到父节点中
parent.addChild(childName);
// 添加到nodes中
nodes.put(path, child);
// 临时节点,把路径加到当前session创建临时节点的list中,当session断开时会用
if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
}
//-------忽略非重点代码
// 触发新增节点事件
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
// 触发父节点的子节点变更事件
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
return path;
}
创建节点代码逻辑很简单,主要注意几点:
- 创建节点时对父节点加锁,防止多线程并发在同一父节点下创建子节点;
- 如果创建的是临时节点,把节点路径加到ephemerals中,当session断开时会删除这些临时节点;
- 触发新增节点事件,同时触发父节点的子节点变更事件;
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 把该路径 -> 该路径上注册的watcher,映射删除,即watch事件触发一次后,再监听的话必须再重新注册
watchers = watchTable.remove(path);
// 没有监听,直接返回
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
}
return null;
}
// 当前path从 watcher 监控的路径中删除
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
// 触发path对应的watcher注册的事件
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 执行用户watch回调方法
w.process(e);
}
return watchers;
}
主要做下面两点:
- 把从该节点到该节点上注册的watcher的映射删除,把当前节点从 watcher监控的节点集合中删除,即watch事件只触发一次,再监听的话必须再重新注册;
- 触发节点对应的所有watcher注册的事件,即回调客户端的watch.process方法
2.2 DataTree.killSession
void killSession(long session, long zxid) {
// 获取session创建的临时节点list
HashSet<String> list = ephemerals.remove(session);
if (list != null) {
for (String path : list) {
try {
//删除该session创建的临时节点
deleteNode(path, zxid);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting ephemeral node " + path+ " for session 0x"+ Long.toHexString(session));
}
} catch (NoNodeException e) {
LOG.warn("Ignoring NoNodeException for path " + path+ " while removing ephemeral for dead session 0x"+ Long.toHexString(session));
}
}
}
}
session断开连接时,从ephemerals中获取当前session创建的临时节点list,并逐一删除;
2.3 其他方法
DataTree.setData
:变更节点数据,同时触发NodeDataChanged事件;
DataTree.getData
:获取节点数据,同时watch不为空时注册watcher监听;
DataTree.serialize
:把DataTree序列化,当中会调用serializeNode递归序列化每一个子节点;在需要持久化存储时用到;
DataTree.getACL
:获取权限列表或Long类型acl;
...
小结
本节简单分析了DataTree,DataTree很多方法实现都很简单粗暴没有复杂的设计模式,也没有精细的并发控制。。。
------over------
网友评论