美文网首页Zookeeper
ZK 默认watch为什么只生效一次

ZK 默认watch为什么只生效一次

作者: Alen_ab56 | 来源:发表于2022-01-16 19:31 被阅读0次

    以create节点为例
    有2种请求流转方式
    第一种:client-follower-leader-client
    第二种:client-leader-client

    假设第一种:
    client发送给follower,那么follower会转发请求到leader,leader发送proposal提案给到所有的follower,follower收到proposal后由SyncProcessor进行处理,主要是将写到事务日志,然后
    Follower#processPacket
    case Leader.PROPOSAL:
    TxnHeader hdr = new TxnHeader();
    Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
    if (hdr.getZxid() != lastQueued + 1) {
    LOG.warn("Got zxid 0x"
    + Long.toHexString(hdr.getZxid())
    + " expected 0x"
    + Long.toHexString(lastQueued + 1));
    }
    lastQueued = hdr.getZxid();

            if (hdr.getType() == OpCode.reconfig){
               SetDataTxn setDataTxn = (SetDataTxn) txn;       
               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
               self.setLastSeenQuorumVerifier(qv, true);                               
            }
            
            fzk.logRequest(hdr, txn);
            break;
    

    DataTree#createNode
    public void createNode(final String path, byte data[], List<ACL> acl,
    long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
    throws KeeperException.NoNodeException,
    KeeperException.NodeExistsException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = new StatPersisted();
    stat.setCtime(time);
    stat.setMtime(time);
    stat.setCzxid(zxid);
    stat.setMzxid(zxid);
    stat.setPzxid(zxid);
    stat.setVersion(0);
    stat.setAversion(0);
    stat.setEphemeralOwner(ephemeralOwner);
    DataNode parent = nodes.get(parentName);
    if (parent == null) {
    throw new KeeperException.NoNodeException();
    }
    synchronized (parent) {
    Set<String> children = parent.getChildren();
    if (children.contains(childName)) {
    throw new KeeperException.NodeExistsException();
    }

            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = aclCache.convertAcls(acl);
            DataNode child = new DataNode(data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
            if (outputStat != null) {
                child.copyStat(outputStat);
            }
        }
        // now check if its one of the zookeeper node child
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName
                        .substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix = getMaxPrefixWithQuota(path);
        if(lastPrefix != null) {
            // ok we have some match and need to update
            updateCount(lastPrefix, 1);
            updateBytes(lastPrefix, data == null ? 0 : data.length);
        }
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
    }
    

    WatchManager#triggerWatch
    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
    KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
    watchers = watchTable.remove(path);
    if (watchers == null || watchers.isEmpty()) {
    if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG,
    ZooTrace.EVENT_DELIVERY_TRACE_MASK,
    "No watchers for " + path);
    }
    return null;
    }
    for (Watcher w : watchers) {
    HashSet<String> paths = watch2Paths.get(w);
    if (paths != null) {
    paths.remove(path);
    }
    }
    }
    for (Watcher w : watchers) {
    if (supress != null && supress.contains(w)) {
    continue;
    }
    w.process(e);
    }
    return watchers;
    }

    看这2部分

    1. watchers = watchTable.remove(path);
      2. for (Watcher w : watchers) {
      HashSet<String> paths = watch2Paths.get(w);
      if (paths != null) {
      paths.remove(path);
      }
      }

    ——————源码面前,了无秘密——————

    相关文章

      网友评论

        本文标题:ZK 默认watch为什么只生效一次

        本文链接:https://www.haomeiwen.com/subject/lcnxhrtx.html