美文网首页分布式技术干货Java学习笔记
zk源码阅读21:会话管理SessionTrackerImp源码

zk源码阅读21:会话管理SessionTrackerImp源码

作者: 赤子心_d709 | 来源:发表于2017-07-19 17:31 被阅读127次

摘要

本节讲解会话,会话状态以及会话的创建,针对源码
SessionTrackerImp进行展开,主要讲解

  会话定义
  会话状态,状态机
  会话创建
    sessionId唯一性保证
  会话管理
    分桶策略
    会话激活
      激活流程
      何时激活
    超时检测
  会话清理
  会话重连

会话

客户端与服务端之间任何交互操作都与会话息息相关,如临时节点的生命周期、客户端请求的顺序执行、Watcher通知机制等。Zookeeper的连接与会话就是客户端通过实例化Zookeeper对象来实现客户端与服务端创建并保持TCP连接的过程.

会话状态

在Zookeeper客户端与服务端成功完成连接创建后,就创建了一个会话,Zookeeper会话在整个运行期间的生命周期中,会在不同的会话状态中之间进行切换,这些状态可以分为CONNECTING、CONNECTED、RECONNECTING、RECONNECTED、CLOSE等。

一旦客户端开始创建Zookeeper对象,那么客户端状态就会变成CONNECTING状态,同时客户端开始尝试连接服务端,连接成功后,客户端状态变为CONNECTED,通常情况下,由于断网或其他原因,客户端与服务端之间会出现断开情况,一旦碰到这种情况,Zookeeper客户端会自动进行重连服务,同时客户端状态再次变成CONNCTING,直到重新连上服务端后,状态又变为CONNECTED,在通常情况下,客户端的状态总是介于CONNECTING和CONNECTED之间。但是,如果出现诸如会话超时、权限检查或是客户端主动退出程序等情况,客户端的状态就会直接变更为CLOSE状态。


zk会话状态变更

其中CONNECTING,CONNECTED在上一节sendThread中都有提到
关于AUTH_FAILED,SESSION_EXPIRED,CONNECTION_LOSS的情况在ClientCnxn#conLossPacket中

    private void conLossPacket(Packet p) {
        if (p.replyHeader == null) {
            return;
        }
        switch (state) {
        case AUTH_FAILED:
            p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
            break;
        case CLOSED:
            p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
            break;
        default:
            p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
        }
        finishPacket(p);
    }

会话创建

这里就讲最底层的会话创建以及会话的数据结构
参照 SessionTrackerImpl.SessionImpl数据结构

    public static class SessionImpl implements Session {
        SessionImpl(long sessionId, int timeout, long expireTime) {
            this.sessionId = sessionId;
            this.timeout = timeout;
            this.tickTime = expireTime;
            isClosing = false;
        }

        final long sessionId;//会话id,全局唯一
        final int timeout;//会话超时时间
        long tickTime;//下次会话的超时时间点,会不断刷新
        boolean isClosing;//是否被关闭,如果关闭则不再处理该会话的新请求

        Object owner;

        public long getSessionId() { return sessionId; }
        public int getTimeout() { return timeout; }
        public boolean isClosing() { return isClosing; }
    }

sessionId唯一性的保证

会话id要保证全局唯一,算法如下

    public static long initializeNextSession(long id) {
        long nextSid = 0;
        nextSid = (System.currentTimeMillis() << 24) >>> 8;
        nextSid =  nextSid | (id <<56);
        return nextSid;
    }

id表示配置在myid文件中的值,通常是一个整数,如1、2、3。
该算法的高8位确定了所在机器,后56位使用当前时间的毫秒表示进行随机。

会话管理

主要分为

  分桶策略
  会话激活
  超时检测

先介绍会话管理需要用到的数据结构

SessionTrackerImpl数据结构

作用列举如下

属性 作用
LOG 日志
sessionsById key是sessionId,value是对应的会话
sessionSets key是某个过期时间,value是会话集合,表示这个过期时间过后就超时的会话集合
sessionsWithTimeout key是sessionId,value是该会话的超时周期(不是时间点)
nextSessionId 一下个会话的id
nextExpirationTime 下一次进行超时检测的时间
expirationInterval 超时检测的周期,多久检测一次
expirer 用于server检测client超时之后给client发送 会话关闭的请求
running 超时检测的线程是否在运行
currentTime 当前时间

主要注意sessionsById,sessionSets两个集合,在分桶策略和超时检测策略中去理解

分桶策略

Zookeeper的会话管理主要是通过SessionTracker来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便Zookeeper对会话进行不同区块的隔离处理以及同一区块的统一处理。


分桶策略

*** Zookeeper将所有的会话都分配在不同的区块一种,分配的原则是每个会话的下次超时时间点(ExpirationTime)。***
参见org.apache.zookeeper.server.SessionTrackerImpl#roundToInterval

    //计算出最近的 一下次统一检测过期的时间
    private long roundToInterval(long time) {
        // We give a one interval grace period
        return (time / expirationInterval + 1) * expirationInterval;
    }

也就是说按照整除expirationInterval 的时间来分桶

会话激活

会了保持客户端会话的有效性,客户端会在会话超时时间过期范围内向服务端发送PING请求来保持会话的有效性(心跳检测)。同时,服务端需要不断地接收来自客户端的心跳检测,并且需要重新激活对应的客户端会话,这个重新激活过程称为TouchSession。会话激活不仅能够使服务端检测到对应客户端的存货性,同时也能让客户端自己保持连接状态,流程如下


会话激活

源码参见
org.apache.zookeeper.server.SessionTrackerImpl#touchSession

    synchronized public boolean touchSession(long sessionId, int timeout) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.CLIENT_PING_TRACE_MASK,
                                     "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);
        }
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);//计算出新的过期时间
        if (s.tickTime >= expireTime) {
            // Nothing needs to be done
            return true;
        }
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);//从旧的过期时间的"桶"中移除
        }
        s.tickTime = expireTime;
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);//移动到新的过期时间的"桶"中
        return true;
    }

代码的步骤和上面流程图一样

client什么时候会发出激活请求

  1. 客户端向服务端发送请求,包括读写请求,就会触发会话激活。
  2. 客户端发现在sessionTimeout/3时间内尚未和服务端进行任何通信,那么就会主动发起PING请求,服务端收到该请求后,就会触发会话激活。

超时检测

对于会话的超时检查而言,Zookeeper使用SessionTracker来负责,SessionTracker使用单独的线程(超时检查线程)专门进行会话超时检查,即逐个一次地对会话桶中剩下的会话进行清理。如果一个会话被激活,那么Zookeeper就会将其从上一个会话桶迁移到下一个会话桶中,如ExpirationTime 1 的session n 迁移到ExpirationTime n 中,此时ExpirationTime 1中留下的所有会话都是尚未被激活的,超时检查线程就定时检查这个会话桶中所有剩下的未被迁移的会话,超时检查线程只需要在这些指定时间点(ExpirationTime 1、ExpirationTime 2...)上进行检查即可,这样提高了检查的效率,性能也非常好。

这个会话超时的线程就是org.apache.zookeeper.server.SessionTrackerImpl,里面run方法如下

    @Override
    synchronized public void run() {
        try {
            while (running) {
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {//如果下一次超时检测的时间还没到,就等
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);//进行会话清理,这个"桶"中的会话都超时了
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        setSessionClosing(s.sessionId);//标记关闭
                        expirer.expire(s);//发起会话关闭请求
                    }
                }
                nextExpirationTime += expirationInterval;//设置下一次清理的时间
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

主要过程就是,等到下一次超时检测的周期,把对应的桶中的会话全部标记关闭,给对应client发送 会话关闭的请求

会话清理

当SessionTracker的会话超时线程检查出已经过期的会话后,就开始进行会话清理工作,大致可以分为如下七步。

1. 标记会话状态为已关闭
2. 发起会话关闭请求
3. 收集需要清理的临时节点
4. 添加节点删除事务变更
5. 删除临时节点
6. 移除会话
7. 关闭NIOServerCnxn

标记会话状态为已关闭

由于会话清理过程需要一段时间,为了保证在此期间不再处理来自该客户端的请求,SessionTracker会首先将该会话的isClosing标记为true,这样在会话清理期间接收到该客户端的心情求也无法继续处理了。

源码在上面的run方法中已经列举

发起会话关闭请求

为了使对该会话的关闭操作在整个服务端集群都生效,Zookeeper使用了提交会话关闭请求的方式,并立即交付给PreRequestProcessor进行处理。

源码在org.apache.zookeeper.server.SessionTrackerImpl#run 调用了

expirer.expire(s);//发起会话关闭请求

跟进就是
org.apache.zookeeper.server.ZooKeeperServer#expire
org.apache.zookeeper.server.ZooKeeperServer#close
org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.ServerCnxn, long, int, int, java.nio.ByteBuffer, java.util.List<org.apache.zookeeper.data.Id>)
org.apache.zookeeper.server.ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
org.apache.zookeeper.server.PrepRequestProcessor#processRequest
发起会话关闭请求这里异步调用,上面完成生产

    public void processRequest(Request request) {
        // request.addRQRec(">prep="+zks.outstandingChanges.size());
        submittedRequests.add(request);
    }

收集需要清理的临时节点

一旦某个会话失效后,那么和该会话相关的临时节点都需要被清理,因此,在清理之前,首先需要将服务器上所有和该会话相关的临时节点都整理出来。Zookeeper在内存数据库中会为每个会话都单独保存了一份由该会话维护的所有临时节点集合,在Zookeeper处理会话关闭请求之前,若正好有以下两类请求到达了服务端并正在处理中。

节点(包含临时与非临时)删除请求,删除的目标节点正好是上述临时节点中的一个。
临时节点创建,修改请求,目标节点正好是上述临时节点中的一个。

对于第一类请求,需要将所有请求对应的数据节点路径从当前临时节点列表中移出,以避免重复删除,对于第二类请求,需要将所有这些请求对应的数据节点路径添加到当前临时节点列表中,以删除这些即将被创建但是尚未保存到内存数据库中的临时节点。

源码根据上面接着讲,上面是异步调用完成生产,下面是完成消费
org.apache.zookeeper.server.PrepRequestProcessor#run
org.apache.zookeeper.server.PrepRequestProcessor#pRequest

PrepRequestProcessor#pRequest处理会话关闭请求

进入
org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn
对应处理如下

      case OpCode.closeSession:
                // We don't want to do this check since the session expiration thread
                // queues up this operation without being the session owner.
                // this request is the last of the session so it should be ok
                //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                HashSet<String> es = zks.getZKDatabase()
                        .getEphemerals(request.sessionId);//获取sessionId对应的临时节点的路径列表
                synchronized (zks.outstandingChanges) {
                    for (ChangeRecord c : zks.outstandingChanges) {//遍历 zk serve的事务变更队列,这些事务处理尚未完成,没有应用到内存数据库中
                        if (c.stat == null) {//如果当前变更记录没有状态信息(删除时才会出现,参照上面处理delete时的ChangeRecord构造参数)
                            // Doing a delete
                            es.remove(c.path);//避免多次删除
                        } else if (c.stat.getEphemeralOwner() == request.sessionId) {//如果变更节点是临时的,且源于当前sessionId(只有创建和修改时,stat不会为null)
                            es.add(c.path);//添加记录,最终要将添加或者修改的record再删除掉
                        }
                    }
                    for (String path2Delete : es) {//添加节点变更事务,将es中所有路径的临时节点都删掉
                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                path2Delete, null, 0, null));
                    }

                    zks.sessionTracker.setSessionClosing(request.sessionId);
                }

                LOG.info("Processed session termination for sessionid: 0x"
                        + Long.toHexString(request.sessionId));
                break;

其中,outstandingChanges理解成zk serve的事务变更队列,事务还没有完成,尚未同步到内存数据库中的一个队列,表示变更记录 之后讲server的时候再讲

注意里面两个if
第一个if

  if (c.stat == null)

如果是已经有了一个删除的节点,那么es中去掉这条记录(当然原本不一定有这条记录,如果有就去掉),这样避免重复删除
第二个if

  if (c.stat.getEphemeralOwner() == request.sessionId)

这个地方书上讲的不对,只说了创建,修改的时候也会出现这种情况
就是如果变更记录的临时拥有者是当前sessionId的话,就加入es中,再删掉

添加节点删除事务变更

完成该会话相关的临时节点收集后,Zookeeper会逐个将这些临时节点转换成"节点删除"请求,并放入事务变更队列outstandingChanges中。

在上一部分代码中已经讲过了,

添加节点删除事务变更
    void addChangeRecord(ChangeRecord c) {
        synchronized (zks.outstandingChanges) {
            zks.outstandingChanges.add(c);
            zks.outstandingChangesForPath.put(c.path, c);
        }
    }

删除临时节点

FinalRequestProcessor会触发内存数据库,删除该会话对应的所有临时节点。

请求最终到了org.apache.zookeeper.server.FinalRequestProcessor#processRequest

FinalRequestProcessor完成事务,应用到内存数据库

移除会话

完成节点删除后,需要将会话从SessionTracker中删除。
同样包含在上图的org.apache.zookeeper.server.FinalRequestProcessor#processRequest中

    rc = zks.processTxn(hdr, txn);

org.apache.zookeeper.server.ZooKeeperServer#processTxn

    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
        rc = getZKDatabase().processTxn(hdr, txn);
        if (opCode == OpCode.createSession) {
            if (txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.addSession(sessionId, cst
                        .getTimeOut());
            } else {
                LOG.warn("*****>>>>> Got "
                        + txn.getClass() + " "
                        + txn.toString());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);//移除会话
        }
        return rc;
    }

org.apache.zookeeper.server.SessionTrackerImpl#removeSession

    synchronized public void removeSession(long sessionId) {
        SessionImpl s = sessionsById.remove(sessionId);//sessionsById中移除
        sessionsWithTimeout.remove(sessionId);//sessionsWithTimeout中移除
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Removing session 0x"
                    + Long.toHexString(sessionId));
        }
        if (s != null) {
            SessionSet set = sessionSets.get(s.tickTime);
            // Session expiration has been removing the sessions   
            if(set != null){
                set.sessions.remove(s);//sessionSets中移除
            }
        }
    }

关闭NIOServerCnxn

最后,从NIOServerCnxnFactory找到该会话对应的NIOServerCnxn,将其关闭。

源码见org.apache.zookeeper.server.FinalRequestProcessor#processRequest

关闭NIOServerCnxn

会话重连

当客户端与服务端之间的网络连接断开时,Zookeeper客户端会自动进行反复的重连,直到最终成功连接上Zookeeper集群中的一台机器。此时,再次连接上服务端的客户端有可能处于以下两种状态之一

  1. CONNECTED。如果在会话超时时间内重新连接上集群中一台服务器 。
  2. EXPIRED。如果在会话超时时间以外重新连接上,那么服务端其实已经对该会话进行了会话清理操作,此时会话被视为非法会话。

在客户端与服务端之间维持的是一个长连接,在sessionTimeout时间内,服务端会不断地检测该客户端是否还处于正常连接,服务端会将客户端的每次操作视为一次有效的心跳检测来反复地进行会话激活。因此,在正常情况下,客户端会话时一直有效的。然而,当客户端与服务端之间的连接断开后,用户在客户端可能主要看到两类异常:CONNECTION_LOSS(连接断开)和SESSION_EXPIRED(会话过期)。

CONNECTION_LOSS

此时,客户端会自动从地址列表中重新逐个选取新的地址并尝试进行重新连接,直到最终成功连接上服务器。

出现场景:网络中断或者是server没有响应
对应源码,只是出现情况的一部分,不包含所有情况
org.apache.zookeeper.ClientCnxn.SendThread#run

zk client抛出超时 zk client处理超时

org.apache.zookeeper.ClientCnxn.SendThread#cleanup

    private void cleanup() {//socket清理以及通知两个queue失去连接 以及 清理两个队列
            clientCnxnSocket.cleanup();
            synchronized (pendingQueue) {
                for (Packet p : pendingQueue) {
                    conLossPacket(p);
                }
                pendingQueue.clear();
            }
            synchronized (outgoingQueue) {
                for (Packet p : outgoingQueue) {
                    conLossPacket(p);
                }
                outgoingQueue.clear();
            }
        }

cleanUp执行完了之后,run方法会再次进入连接逻辑

    if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey为null
                        if(!isFirstConnect){//如果不是第一次连接就sleep一下
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        startConnect();//开始连接
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

重连方式:cleanUp完了之后又开始重新连接

SESSION_EXPIRED

客户端与服务端断开连接后,重连时间耗时太长,超过了会话超时时间限制后没有成功连上服务器,服务器会进行会话清理,此时,客户端不知道会话已经失效,状态还是DISCONNECTED,如果客户端重新连上了服务器,此时状态为SESSION_EXPIRED,用于需要重新实例化Zookeeper对象,并且看应用的复杂情况,重新恢复临时数据。

触发场景对应源码:
org.apache.zookeeper.ClientCnxn.SendThread#onConnected
这个没有demo验证过,应该是在这里

SESSION_EXPIRED在哪里抛出

然后依旧是org.apache.zookeeper.ClientCnxn.SendThread#run处理异常,调用cleanUp

只不过最下面,状态是CLOSED这个case
org.apache.zookeeper.ClientCnxn#conLossPacket

    private void conLossPacket(Packet p) {
        if (p.replyHeader == null) {
            return;
        }
        switch (state) {
        case AUTH_FAILED:
            p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
            break;
        case CLOSED:
            p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
            break;
        default:
            p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
        }
        finishPacket(p);
    }

SESSION_MOVED

客户端会话从一台服务器转移到另一台服务器,即客户端与服务端S1断开连接后,重连上了服务端S2,此时会话就从S1转移到了S2。当多个客户端使用相同的sessionId/sessionPasswd创建会话时,会收到SessionMovedException异常。因为一旦有第二个客户端连接上了服务端,就被认为是会话转移了。

源码,这个是在server检查的时候
org.apache.zookeeper.server.SessionTrackerImpl#checkSession

    synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
        SessionImpl session = sessionsById.get(sessionId);
        if (session == null || session.isClosing()) {
            throw new KeeperException.SessionExpiredException();
        }
        if (session.owner == null) {
            session.owner = owner;
        } else if (session.owner != owner) {//如果owner不一致
            throw new KeeperException.SessionMovedException();
        }
    }

思考

会话激活中,客户端在sessionTimeout/3时间内尚未和服务端进行任何通信就发PING的代码在哪

image.png

而在连接上server时,org.apache.zookeeper.ClientCnxn.SendThread#onConnected定义了

    readTimeout = negotiatedSessionTimeout * 2 / 3;

因此timeToNextPing略小于sessionTimeout/3

注意准确的时间是<sessionTimeout/3的

会话清理时的两个细节注意一致性

  节点(包含临时与非临时)删除请求,删除的目标节点正好是上述临时节点中的一个。
  临时节点创建请求,创建的目标节点正好是上述临时节点中的一个。

源码讲解已经讲过了,注意就是
第一种情况,删过了,就不用再删(不用重复删)
第二种情况,如果是创建的临时节点(事务尚未完成,还没应用到内存数据库),则删掉

会话清理时,为什么有必要处理第二种遇到临时节点创建的情况

难道不是下图中第一个红箭头处,就能列举出来sessionId对应的临时节点path集合吗?
没错的,不过只能列举出来已经事务处理完成并且应用到内存数据库中的数据
这里的if是针对还没有事务处理完,不存在于内存数据库中的数据,也就是第二个红箭头处

会话清理处理第二种临时节点处理请求的必要性

问题

org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl#timeout

会话的超时时间,是server根据client传递的timeout再最终确定的
这个代码体现在哪?
org.apache.zookeeper.ClientCnxn.SendThread#run中

refer

http://www.cnblogs.com/leesf456/p/6103870.html 主要参照

相关文章

网友评论

    本文标题:zk源码阅读21:会话管理SessionTrackerImp源码

    本文链接:https://www.haomeiwen.com/subject/auuqkxtx.html