美文网首页
Zookeeper(二)-数据模型

Zookeeper(二)-数据模型

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-07 20:14 被阅读0次

    概述

    zookeeper的数据结构与linux文件系统很类似,和文件系统不同的是,zk的数据存储是结构化存储,没有文件和目录的概念,文件和目录被抽象成了节点(DataNode),DataNode组成DataTree。本节就来看下DataTree/DataNode相关源码,为后续学习打好基础。

    一、数据结构

    数据模型

    1.1 DataTree

    public class DataTree {
        // path -> DataNode的映射关系
        private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
        // 数据类型通知管理器
        private final WatchManager dataWatches = new WatchManager();
        // 子节点类通知管理器
        private final WatchManager childWatches = new WatchManager();
        // 客户端sessionId -> 该session创建的临时节点路径集合
        private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
        // 权限缓存
        private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();
    }
    

    DataTree是zk的内存数据结构,是有所有DataNode构成的树;

    • ConcurrentHashMap<String, DataNode> nodes:节点路径到节点对象DataNode的映射关系,存放该Tree上所有Node;
    • WatchManager dataWatches:数据类型通知管理器
    • WatchManager childWatches:子节点类通知管理器
    • Map<Long, HashSet<String>> ephemerals:客户端sessionId到该session创建的临时节点路径集合的映射;
    • ReferenceCountedACLCache aclCache:权限缓存

    1.2 DataNode

    public class DataNode implements Record {
        // 父节点
        DataNode parent;
        // 节点数据
        byte data[];
        // acl转换成long
        Long acl;
        // 节点描述信息
        public StatPersisted stat;
        // 子节点信息
        private Set<String> children = null;
    }
    

    DataNode是节点在内存中具体数据结构,包含节点数据、父节点信息、子节点信息、节点权限、节点描述信息;

    1.3 StatPersisted

    public class StatPersisted implements Record {
      // 节点创建id
      private long czxid;
      // 节点修改id
      private long mzxid;
      // 节点创建时间
      private long ctime;
      // 节点修改时间
      private long mtime;
      // 当前节点的数据版本号,初始版本为0,每对该节点的数据进行更改操作时加1
      private int version;
      // 当前节点的子节点版本号,初始值为0,每对该节点的子节点进行更改操作时加1
      private int cversion;
      // 当前节点的acl权限版本号,初始版本为0,对权限修改该时加1
      private int aversion;
      // 节点所有者 0标识anyone
      private long ephemeralOwner;
      //子节点修改时间
      private long pzxid;
     }
    

    StatPersisted是节点详细的描述信息,基本跟get命令返回数据一致,

    [zk: localhost:2181(CONNECTED) 0] get /zk
    test
    cZxid = 0x2
    ctime = Sat Dec 05 15:54:12 CST 2020
    mZxid = 0x2
    mtime = Sat Dec 05 15:54:12 CST 2020
    pZxid = 0x65
    cversion = 8
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 4
    numChildren = 8
    

    1.4 WatchManager

    public class WatchManager {
        // 路径到watcher的映射关系
        private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
        // watcher到路径的映射关系
        private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
    }
    

    WatchManager是服务器端Watch的注册表,跟客户端Watch注册表ZKWatchManage对应;

    • HashMap<String, HashSet<Watcher>> watchTable:路径到watcher的映射关系,即该节点上注册了哪些监听者;
    • HashMap<Watcher, HashSet<String>> watch2Paths:watcher到路径的映射关系,即该监听者注册监听了哪些节点;

    1.5 Watcher

    public interface Watcher {
        abstract public void process(WatchedEvent event);
    }
    

    Watcher监听者接口,只提供一个process监听处理方法;客户端可以通过回调把该方法实现注册到节点变更事件上;

    1.6 ReferenceCountedACLCache

    public class ReferenceCountedACLCache {
        // Long类型acl -> acl list映射
        final Map<Long, List<ACL>> longKeyMap = new HashMap<Long, List<ACL>>();
        // acl list -> Long类型acl映射
        final Map<List<ACL>, Long> aclKeyMap = new HashMap<List<ACL>, Long>();
        // Long类型acl引用计数
        final Map<Long, AtomicLongWithEquals> referenceCounter = new HashMap<Long, AtomicLongWithEquals>();
    }
    

    ReferenceCountedACLCache主要用于Long类型acl和acl list之间互相转换,并存储相互之间映射信息,以节省DataTree整个树形结构的内存占用;

    二、重点方法分析

    2.1 DataTree.createNode

    public String createNode(String path, byte data[], List<ACL> acl,
            long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        // 获取父路径
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        // 判断父节点是否存在
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            // 判断父节点下该子节点是否已经存在
            if (children.contains(childName)) {
                throw new KeeperException.NodeExistsException();
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                //父节点cVersion + 1
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            // acl转换成long类型
            Long longval = aclCache.convertAcls(acl);
            // 创建DataNode
            DataNode child = new DataNode(parent, data, longval, stat);
            // dataNode添加到父节点中
            parent.addChild(childName);
            // 添加到nodes中
            nodes.put(path, child);
            // 临时节点,把路径加到当前session创建临时节点的list中,当session断开时会用
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
        //-------忽略非重点代码
        // 触发新增节点事件
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        // 触发父节点的子节点变更事件
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }
    

    创建节点代码逻辑很简单,主要注意几点:

    1. 创建节点时对父节点加锁,防止多线程并发在同一父节点下创建子节点;
    2. 如果创建的是临时节点,把节点路径加到ephemerals中,当session断开时会删除这些临时节点;
    3. 触发新增节点事件,同时触发父节点的子节点变更事件;
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            // 把该路径 -> 该路径上注册的watcher,映射删除,即watch事件触发一次后,再监听的话必须再重新注册
            watchers = watchTable.remove(path);
            // 没有监听,直接返回
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
                }
                return null;
            }
            // 当前path从 watcher 监控的路径中删除
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        // 触发path对应的watcher注册的事件
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // 执行用户watch回调方法
            w.process(e);
        }
        return watchers;
    }
    

    主要做下面两点:

    1. 把从该节点到该节点上注册的watcher的映射删除,把当前节点从 watcher监控的节点集合中删除,即watch事件只触发一次,再监听的话必须再重新注册;
    2. 触发节点对应的所有watcher注册的事件,即回调客户端的watch.process方法

    2.2 DataTree.killSession

    void killSession(long session, long zxid) {
        // 获取session创建的临时节点list
        HashSet<String> list = ephemerals.remove(session);
        if (list != null) {
            for (String path : list) {
                try {
                    //删除该session创建的临时节点
                    deleteNode(path, zxid);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Deleting ephemeral node " + path+ " for session 0x"+ Long.toHexString(session));
                    }
                } catch (NoNodeException e) {
                    LOG.warn("Ignoring NoNodeException for path " + path+ " while removing ephemeral for dead session 0x"+ Long.toHexString(session));
                }
            }
        }
    }
    

    session断开连接时,从ephemerals中获取当前session创建的临时节点list,并逐一删除;

    2.3 其他方法

    DataTree.setData:变更节点数据,同时触发NodeDataChanged事件;
    DataTree.getData:获取节点数据,同时watch不为空时注册watcher监听;
    DataTree.serialize:把DataTree序列化,当中会调用serializeNode递归序列化每一个子节点;在需要持久化存储时用到;
    DataTree.getACL:获取权限列表或Long类型acl;
    ...

    小结

    本节简单分析了DataTree,DataTree很多方法实现都很简单粗暴没有复杂的设计模式,也没有精细的并发控制。。。

    ------over------

    相关文章

      网友评论

          本文标题:Zookeeper(二)-数据模型

          本文链接:https://www.haomeiwen.com/subject/xhrtgktx.html