美文网首页程序员
ZooKeeper源码分析之Session

ZooKeeper源码分析之Session

作者: 兽怪海北 | 来源:发表于2020-10-27 13:17 被阅读0次

    session在zookeeper中是一个不能避开的概念,临时节点(包括临时普通节点和临时顺序节点)都是与session关联的,临时节点将在session超期后被删除。本篇我们来看一下session的创建与销毁,过期session的处理等内容。

    session创建

    客户端的连接请求到达服务器后最早由processConnectRequest处理,我们分段来看一下processConnectRequest的处理流程。

    ConnectRequest connReq = new ConnectRequest();
    connReq.deserialize(bia, "connect");
    

    反序列化connect请求,connect请求参数如下:

    private int protocolVersion;
    private long lastZxidSeen;
    private int timeOut;
    private long sessionId;
    private byte[] passwd;
    
    字段名 含义
    protocolVersion 协议版本号
    lastZxidSeen 客户端过去看到的最大的zxid
    timeOut session的超时时间
    sessionId session的id,新连接传0,重连传之前服务器返回的sessionid
    passwd session对应的密码,由服务器返回给客户端,重连时需要传递该字段

    判断是否是readonly请求,ReadOnlyZooKeeperServer只处理readonly的连接,ReadOnlyZooKeeperServer在前面的集群启动中我们讲过,就是使得服务器在选举过程中或者发生分区时依旧可以读数据。

    boolean readOnly = false;
    try {
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
    }
    if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
    

    判断客户端最后看到的zxid是否大于我们的最后处理zxid,如果是则关闭连接,让客户端尝试连接有最新数据的其它服务器。

    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
                     + cnxn.getRemoteSocketAddress()
                     + " as it has seen zxid 0x"
                     + Long.toHexString(connReq.getLastZxidSeen())
                     + " our last zxid is 0x"
                     + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                     + " client must try another server";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
    }
    

    读取session的超时值,校验并规格化:

    int sessionTimeout = connReq.getTimeOut();
    byte[] passwd = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    

    创建或重连session:

    if (sessionId == 0) {
        long id = createSession(cnxn, passwd, sessionTimeout);
    } else {
        long clientSessionId = connReq.getSessionId();
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
    }
    

    重连session我们放到下一节分析,我们来看一下createSession方法的逻辑,代码不长,我们就直接一次性放出来。

    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            passwd = new byte[0];
        }
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
        submitRequest(si);
        return sessionId;
    }
    
    1. sessionTracker.createSession(timeout),创建session并拿到sessionid。SessionTracker是一个接口,有4个实现类。

      类名 用途
      SessionTrackerImpl 单机使用。
      LocalSessionTracker 继承了SessionTrackerImpl,给UpgradeableSessionTracker用,UpgradeableSessionTracker包含一个LocalSessionTracker。
      UpgradeableSessionTracker LeaderSessionTracker和LearnerSessionTracker的抽象基类。
      LeaderSessionTracker leader使用。
      LearnerSessionTracker follower和observer使用。
    2. 生成session密码。

    3. 提交session创建请求。

    我们来看一下单机模式下sessionTracker.createSession(timeout)做了些什么:

    public long createSession(int sessionTimeout) {
        long sessionId = nextSessionId.getAndIncrement();
        trackSession(sessionId, sessionTimeout);
        return sessionId;
    }
    
    public synchronized boolean trackSession(long id, int sessionTimeout) {
        boolean added = false;
        SessionImpl session = sessionsById.get(id);
        if (session == null) {
            session = new SessionImpl(id, sessionTimeout);
        }
        SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
        if (existedSession != null) {
            session = existedSession;
        } else {
            added = true;
        }
        updateSessionExpiry(session, sessionTimeout);
        return added;
    }
    

    首先得到下一个sessionid,然后将这个sessionid纳入管理。

    来看一下leader模式下sessionTracker.createSession(timeout)做了些什么:

    public long createSession(int sessionTimeout) {
        if (localSessionsEnabled) {
            return localSessionTracker.createSession(sessionTimeout);
        }
        return globalSessionTracker.createSession(sessionTimeout);
    }
    

    localSessionsEnabled由配置文件决定(zoo.cfg),这个选项决定了UpgradeableSessionTracker是使用LocalSessionTracker还是SessionTrackerImpl来实现。使用LocalSessionTracker的情况下,创建session时不会扩散到整个集群,而在需要升级时,根据localSessionsUpgradingEnabled的值确定是否需要将本地session升级为全局session扩散到整个集群。

    再来看一下follower或者observer模式下sessionTracker.createSession(timeout)做了些什么:

    public long createSession(int sessionTimeout) {
        if (localSessionsEnabled) {
            return localSessionTracker.createSession(sessionTimeout);
        }
        return nextSessionId.getAndIncrement();
    }
    

    判断localSessionsEnabled是否开启,如果开启了,创建一个本地session,如果没有,创建一个全局session。

    接下来我们看session创建请求在processor链的处理过程。

    先看单机:

    1. 首先是PrepRequestProcessor:

      int to = request.request.getInt();
      request.setTxn(new CreateSessionTxn(to));
      request.request.rewind();
      zks.sessionTracker.trackSession(request.sessionId, to);
      zks.setOwner(request.sessionId, request.getOwner());
      

      很简单,调用SessionTracker的trackSession方法。

    2. 再看FinalRequestProcessor:

      applyRequest

      ProcessTxnResult rc = zks.processTxn(request);
      

      processTxn

      processTxnForSessionEvents(request, hdr, request.getTxn());
      

      processTxnForSessionEvents

      private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
          int opCode = (request == null) ? hdr.getType() : request.type;
          long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
          if (opCode == OpCode.createSession) {
              if (hdr != null && txn instanceof CreateSessionTxn) {
                  CreateSessionTxn cst = (CreateSessionTxn) txn;
                  sessionTracker.commitSession(sessionId, cst.getTimeOut());
              } else if (request == null || !request.isLocalSession()) {
                  LOG.warn("*****>>>>> Got {} {}",  txn.getClass(), txn.toString());
              }
          } else if (opCode == OpCode.closeSession) {
              sessionTracker.removeSession(sessionId);
          }
      }
      

    很简单,调用SessionTracker的commitSession方法。

    case OpCode.createSession: {
        lastOp = "SESS";
        updateStats(request, lastOp, lastZxid);
        zks.finishSessionInit(request.cnxn, true);
        return;
    }
    

    zks.finishSessionInit给客户端返回正确的响应,包括session超时值,sessionid和session对应的密码。

    再看leader:

    分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。

    先看开启了localSessionsEnabled的情况:

    1. 首先是PrepRequestProcessor:

      pRequestHelper

      case OpCode.createSession:
      case OpCode.closeSession:
          if (!request.isLocalSession()) {
              pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
          }
          break;
      

      如果是客户端发来的请求,不做任何处理。如果是follower或者observer发来的请求,则生成hdr头和txn记录,调用SessionTracker的trackSession方法。

    2. 然后是ProposalRequestProcessor:

      processRequest

      if (shouldForwardToNextProcessor(request)) {
          nextProcessor.processRequest(request);
      }
      if (request.getHdr() != null) {
          try {
              zks.getLeader().propose(request);
          } catch (XidRolloverException e) {
              throw new RequestProcessorException(e.getMessage(), e);
          }
          syncProcessor.processRequest(request);
      }
      

    对于客户端发来的请求,hdr头为空,什么也不做,直接交给下一个processor处理。如果是follower或者observer发来的请求则提交提案,等待半数节点完成session创建。

    1. 最后是FinalRequestProcessor:

      与单机模式下的情况完全相同。

    再看没开启localSessionsEnabled的情况:

    1. 首先是PrepRequestProcessor:

      pRequestHelper

      case OpCode.createSession:
      case OpCode.closeSession:
          if (!request.isLocalSession()) {
              pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
          }
          break;
      

      pRequest2Txn

      int to = request.request.getInt();
      request.setTxn(new CreateSessionTxn(to));
      request.request.rewind();
      zks.sessionTracker.trackSession(request.sessionId, to);
      zks.setOwner(request.sessionId, request.getOwner());
      

      生成hdr头和txn记录,调用SessionTracker的trackSession方法。

    2. 然后是ProposalRequestProcessor:

      processRequest

      if (shouldForwardToNextProcessor(request)) {
          nextProcessor.processRequest(request);
      }
      if (request.getHdr() != null) {
          try {
              zks.getLeader().propose(request);
          } catch (XidRolloverException e) {
              throw new RequestProcessorException(e.getMessage(), e);
          }
          syncProcessor.processRequest(request);
      }
      

    提交提案,等待半数节点完成session创建。

    1. 最后是FinalRequestProcessor:

      与单机模式下的情况完全相同。

    然后看follower:

    分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。

    先看开启了localSessionsEnabled的情况:

    1. 首先是FollowerRequestProcessor:

      processRequest

      upgradeRequest = zks.checkUpgradeSession(request);
      if (upgradeRequest != null) {
          queuedRequests.add(upgradeRequest);
      }
      

      如果需要升级session,发送createSession请求来升级session。

      run

      case OpCode.createSession:
      case OpCode.closeSession:
          if (!request.isLocalSession()) {
              zks.getFollower().request(request);
          }
          break;
      

      如果是升级session的请求,则将请求转给leader处理。如果是客户端发来的请求,不做任何处理。

    2. 最后是FinalRequestProcessor:

      与单机模式下的情况完全相同。

    再看没开启localSessionsEnabled的情况:

    1. 首先是FollowerRequestProcessor:

      run

      case OpCode.createSession:
      case OpCode.closeSession:
          if (!request.isLocalSession()) {
              zks.getFollower().request(request);
          }
          break;
      

      将请求转给leader处理。

    2. 最后是FinalRequestProcessor:

      与单机模式下的情况完全相同。

    observer的过程跟follower差不多,就不分析了。

    大家可能会疑惑,本地session的id是不是会跟全局session的id冲突,比如客户端连接了服务器1创建了一个session,另一个客户端连接服务器2创建了一个session,这两个session的id是不是会冲突。实际上zookeeper解决这个问题的方法是每个server根据server的id不同,创建的session的起始值也不一样,所以不会冲突。这个初始值还与当前时间有关,这样来避免重启后的sessionid(重启后会从数据库快照和事物日志中重建session,也就是说session实际上也是持久化的)与重启前的sessionid冲突,代码如下:

    public static long initializeNextSessionId(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8;
        nextSid = nextSid | (id << 56);
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // this is an unlikely edge case, but check it just in case
        }
        return nextSid;
    }
    
    session重连

    入口是:

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
        if (checkPasswd(sessionId, passwd)) {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        } else {
            LOG.warn(
                "Incorrect password from {} for session 0x{}",
                cnxn.getRemoteSocketAddress(),
                Long.toHexString(sessionId));
            finishSessionInit(cnxn, false);
        }
    }
    

    逻辑很简单,检查session对应的密码是否正确,若不正确,给客户端返回错误。若正确,调用revalidateSession。

    protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        finishSessionInit(cnxn, rc);
    }
    

    调用sessionTracker.touchSession,根据结果给客户端返回正确或错误。touchSession返回session是否没过期,如果没过期还会更新过期时间。

    session关闭:

    客户端正常关闭时发送closeSession包,我们来看一下closeSession包的处理过程。

    先看单机:

    1. 首先是PrepRequestProcessor:

      if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
          request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
      }
      zks.sessionTracker.setSessionClosing(request.sessionId);
      

      如果开启了zookeeper.closeSessionTxn.enabled,则设置tnx为所有需要删除的临时节点的路径,默认开启。设置该session的状态为正在关闭。

    2. 最后是FinalRequestProcessor

      applyRequest

      ProcessTxnResult rc = zks.processTxn(request);
      if (request.type == OpCode.closeSession && connClosedByClient(request)) {
          if (closeSession(zks.serverCnxnFactory, request.sessionId)
              || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
              return rc;
          }
      }
      

      ZooKeeperServer的processTnx方法

      sessionTracker.removeSession(sessionId);
      if (txn != null) {
          killSession(sessionId, header.getZxid(),
                  ephemerals.remove(sessionId),
                  ((CloseSessionTxn) txn).getPaths2Delete());
      } else {
          killSession(sessionId, header.getZxid());
      }
      

      从sessionTracker中移除session,并调用killSession方法

      void killSession(long session, long zxid, Set<String> paths2DeleteLocal,
              List<String> paths2DeleteInTxn) {
          if (paths2DeleteInTxn != null) {
              deleteNodes(session, zxid, paths2DeleteInTxn);
          }
          if (paths2DeleteLocal == null) {
              return;
          }
          if (paths2DeleteInTxn != null) {
              for (String path: paths2DeleteInTxn) {
                  paths2DeleteLocal.remove(path);
              }
              if (!paths2DeleteLocal.isEmpty()) {
                  LOG.warn(
                      "Unexpected extra paths under session {} which are not in txn 0x{}",
                      paths2DeleteLocal,
                      Long.toHexString(zxid));
              }
          }
          deleteNodes(session, zxid, paths2DeleteLocal);
      }
      

      删除session关联的临时节点

      processRequest

      if (request.type == OpCode.closeSession) {
          cnxn.sendCloseSession();
      }
      

      关闭与客户端的连接。

    再看其它:

    其它情况与单机差不多,不同的是如果是一个本地session,closeSession的请求只需要在本地做,且不需要清理临时节点,不需要扩散到整个集群,因为如果session关联了临时节点,该session会被升级为全局session。如果是全局session,closeSession的请求需要扩散到整个集群。

    session清理

    入口是SessionTrackerImpl的run方法:

    public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }
                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
    }
    

    sessionExpiryQueue是一个ExpiryQueue,expire的间隔由zoo.cfg中的tickTime决定,默认值是3000。

    ExpiryQueue的实现比较精妙,主要有三个接口方法update,getWaitTime和poll,update往队列中放,getWaitTime获取到下次执行需要等待的时间,poll获取本次需要过期的session的集合。服务器每次收到一个请求(包括心跳请求)都会调用SessionTracker的touchSession方法,touchSession会调用update方法。

    public Long update(E elem, int timeout) {
        Long prevExpiryTime = elemMap.get(elem);
        long now = Time.currentElapsedTime();
        Long newExpiryTime = roundToNextInterval(now + timeout);
        if (newExpiryTime.equals(prevExpiryTime)) {
            return null;
        }
        Set<E> set = expiryMap.get(newExpiryTime);
        if (set == null) {
            set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
            if (existingSet != null) {
                set = existingSet;
            }
        }
        set.add(elem);
        prevExpiryTime = elemMap.put(elem, newExpiryTime);
        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
            Set<E> prevSet = expiryMap.get(prevExpiryTime);
            if (prevSet != null) {
                prevSet.remove(elem);
            }
        }
        return newExpiryTime;
    }
    

    首先判断新过期的时间是否与之前计算的时间相同,如果相同直接返回。从expiryMap中获取需要在newExpiryTime过期的集合,如果没有则新建一个,将元素放入该集合中,然后将元素从旧集合中移除。roundToNextInterval以expirationInterval(tickTime)向上取整,比如传入时间是1500,expirationInterval是3000,则计算出的结果是3000。

    public long getWaitTime() {
        long now = Time.currentElapsedTime();
        long expirationTime = nextExpirationTime.get();
        return now < expirationTime ? (expirationTime - now) : 0L;
    }
    

    获取到下次过期时间的间隔。

    public Set<E> poll() {
        long now = Time.currentElapsedTime();
        long expirationTime = nextExpirationTime.get();
        if (now < expirationTime) {
            return Collections.emptySet();
        }
        Set<E> set = null;
        long newExpirationTime = expirationTime + expirationInterval;
        if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
            set = expiryMap.remove(expirationTime);
        }
        if (set == null) {
            return Collections.emptySet();
        }
        return set;
    }
    

    拉去下一个过期的集合,并更新nextExpirationTime。

    继续看session清理,setSessionClosing(s.sessionId)将session状态置为关闭中,expirer.expire(s)将session过期的真正逻辑。我们来看expirer.expire(s)做了什么:

    public void expire(Session session) {
        long sessionId = session.getSessionId();
        close(sessionId);
    }
    

    然后就是跟正常客户端发来的关闭session的请求一样处理。

    值得注意的是,follower收到客户端的非写请求后不会转发到leader,而全局session的超时清理是由leader处理的,如果不做任何处理,客户端的非写操作(包括ping)leader都感知不到,就会导致全局session被意外清理。ZooKeeper的解决方式是LearnerSessionTracker维护了一个touchTable,在收到leader发来的PING请求时(leader发送PING请求的间隔大概是expirationInterval的一半),为touchTable的所有元素发送ping请求,然后清空touchTable。

    相关文章

      网友评论

        本文标题:ZooKeeper源码分析之Session

        本文链接:https://www.haomeiwen.com/subject/nkfxvktx.html