美文网首页
[Zookeeper] 数据模型 DataTree

[Zookeeper] 数据模型 DataTree

作者: LZhan | 来源:发表于2019-12-27 11:16 被阅读0次

1 前言

Zookeeper 数据模型-类关系图,如下所示

image

2 DataTree的事务请求分析

DataTree中的请求通常分为事务性请求和非事务性请求,而事务性请求则以方法 proceeTxn 为代表,现在创建节点为例分析下整个流程:

processTxn方法接收两个参数:

  • TxnHeader header
    事务记录的头部信息
public class TxnHeader implements Record {
  private long clientId; // 会话sessionId
  private int cxid;// 与客户端交互的id
  private long zxid;// 服务器端生成的事务id
  private long time;
  private int type;  // 事务操作的类型
}
  • Record txn
    事务记录的内容,由jute规范定义了序列化反序列化流程,各个事务操作都实现了Record接口

2.1 返回结果ProcessTxnResult初始化

ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
...

从传入的TxnHeader类型的参数获取头部信息,传入到返回的结果类中。

2.2 以create为例,剖析具体创建节点流程

switch (header.getType()) {
case OpCode.create:
    CreateTxn createTxn = (CreateTxn) txn;
    rc.path = createTxn.getPath();
    // 传入参数 1.节点路径 2.节点数据内容 3.节点权限 
    // 4.节点是否临时节点,为true传入header中的sessionid,false则传入0
    // 5.父节点的子节点列表版本号(从-1开始)
    // 6. header中的事务id 7.节点创建时间
    createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
            createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
            header.getZxid(), header.getTime());
    break;

调用createNode方法:

(1) 基础性赋值操作,主要是创建并初始化描述节点状态信息的类 StatPersisted

int lastSlash = path.lastIndexOf('/');
//获取父节点名称
String parentName = path.substring(0, lastSlash);
//获取当前节点名称
String childName = path.substring(lastSlash + 1);
//创建持久化状态类
StatPersisted stat = new StatPersisted();
//对于创建的新节点而言,创建时间与修改时间是一致的
stat.setCtime(time);
stat.setMtime(time);
//该节点被创建时的事务id
stat.setCzxid(zxid);
//该节点最近一次被修改的事务id
stat.setMzxid(zxid);
//该节点的子节点列表最近一次修改的事务id
stat.setPzxid(zxid);
//当前数据节点内容的版本号
stat.setVersion(0);
//当前数据节点权限的版本号
stat.setAversion(0);
//设置临时节点owner,是临时节点则为sessionId否则为0
stat.setEphemeralOwner(ephemeralOwner);
//获取父节点
DataNode parent = nodes.get(parentName);
if (parent == null) {
    throw new KeeperException.NoNodeException();
}

(2) 利用synchronized锁住父节点parent

synchronized (parent) {
    Set<String> children = parent.getChildren();
    //对应已存在的节点不能重复创建
    if (children.contains(childName)) {
        throw new KeeperException.NodeExistsException();
    }
    // 如果参数父节点子节点列表版本号为-1,为什么要加-1的判断 难道只有-1和0吗
    if (parentCVersion == -1) {
        parentCVersion = parent.stat.getCversion();
        parentCVersion++;
    }
    parent.stat.setCversion(parentCVersion);
    parent.stat.setPzxid(zxid);
    Long longval = aclCache.convertAcls(acl);
    //初始化节点
    DataNode child = new DataNode(parent, data, longval, stat);
    parent.addChild(childName);
    //添加到nodes路径中
    nodes.put(path, child);
    //创建的节点是临时节点
    if (ephemeralOwner != 0) {
        // 获取该sessionId创建的所有临时节点
        HashSet<String> list = ephemerals.get(ephemeralOwner);
        if (list == null) {
            list = new HashSet<String>();
            ephemerals.put(ephemeralOwner, list);
        }
        synchronized (list) {
            list.add(path);
        }
    }
}

这里有一个疑问,在设置父节点的子节点列表版本变更时,有一个if条件判断,满足parentCversion=-1时,才会自增。
其中有一个方法是convertAcls,这是权限缓存ReferenceCountedACLCache中的方法

    public synchronized Long convertAcls(List<ACL> acls) {
        if (acls == null)
            return OPEN_UNSAFE_ACL_ID;

        // get the value from the map
        Long ret = aclKeyMap.get(acls);
        if (ret == null) {
            ret = incrementIndex();
            longKeyMap.put(ret, acls);
            aclKeyMap.put(acls, ret);
        }

        addUsage(ret);

        return ret;
    }

aclKeyMap类型为Map<List< ACL >, Long>,每一个acl的组合都对应一个整型,如果之前不存在则对 aclIndex 进行自增,同时将这一个key-value对存储到aclKeyMap和longKeyMap中。

(3) 触发watcher事件,分别触发当前数据节点的NodeCreated事件和子节点列表的NodeChildrenChanged事件

dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);

调用WatchManager的triggerWatch事件

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    //初始化一个路径为path并且事件类型为type的Watcher事件
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        // 建立了watcher事件后,再去remove掉当前路径(其实就是一次Watcher事件只会触发一次)
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            // 同样也是删除当前路径
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        // 每个Watcher执行process方法
        w.process(e);
    }
    return watchers;
}
特别提醒:创建节点时是不能够多层路径创建的,如果父节点不存在请先创建父节点。

疑惑解答:在设置父节点的子节点列表版本变更时,有一个if条件判断,满足parentCversion=-1时,才会自增?

在ZooKeeper源码中,启动QuorumPeerMain类,用zkCli.sh创建客户端,debug发现,parentCversion作为参数传过来之前就已经自增过了,这个if条件是针对的默认的/zookeeper节点,该节点是zookeeper默认存在的,其cversion初始为-1

比如说当下创建一个新节点 /test ,创建后get /test,其cversion为0;

再创建一个 /test/node1,此时parentCversion已经是1了

3 DataTree的非事务请求分析

DataTree中的getData就是非事务性请求,getData的逻辑比较简单就不解析了。

相关文章

网友评论

      本文标题:[Zookeeper] 数据模型 DataTree

      本文链接:https://www.haomeiwen.com/subject/mnltoctx.html