session在zookeeper中是一个不能避开的概念,临时节点(包括临时普通节点和临时顺序节点)都是与session关联的,临时节点将在session超期后被删除。本篇我们来看一下session的创建与销毁,过期session的处理等内容。
session创建
客户端的连接请求到达服务器后最早由processConnectRequest处理,我们分段来看一下processConnectRequest的处理流程。
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
反序列化connect请求,connect请求参数如下:
private int protocolVersion;
private long lastZxidSeen;
private int timeOut;
private long sessionId;
private byte[] passwd;
字段名 | 含义 |
---|---|
protocolVersion | 协议版本号 |
lastZxidSeen | 客户端过去看到的最大的zxid |
timeOut | session的超时时间 |
sessionId | session的id,新连接传0,重连传之前服务器返回的sessionid |
passwd | session对应的密码,由服务器返回给客户端,重连时需要传递该字段 |
判断是否是readonly请求,ReadOnlyZooKeeperServer只处理readonly的连接,ReadOnlyZooKeeperServer在前面的集群启动中我们讲过,就是使得服务器在选举过程中或者发生分区时依旧可以读数据。
boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
}
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
判断客户端最后看到的zxid是否大于我们的最后处理zxid,如果是则关闭连接,让客户端尝试连接有最新数据的其它服务器。
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
读取session的超时值,校验并规格化:
int sessionTimeout = connReq.getTimeOut();
byte[] passwd = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
创建或重连session:
if (sessionId == 0) {
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
long clientSessionId = connReq.getSessionId();
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
重连session我们放到下一节分析,我们来看一下createSession方法的逻辑,代码不长,我们就直接一次性放出来。
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
passwd = new byte[0];
}
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
submitRequest(si);
return sessionId;
}
-
sessionTracker.createSession(timeout),创建session并拿到sessionid。SessionTracker是一个接口,有4个实现类。
类名 用途 SessionTrackerImpl 单机使用。 LocalSessionTracker 继承了SessionTrackerImpl,给UpgradeableSessionTracker用,UpgradeableSessionTracker包含一个LocalSessionTracker。 UpgradeableSessionTracker LeaderSessionTracker和LearnerSessionTracker的抽象基类。 LeaderSessionTracker leader使用。 LearnerSessionTracker follower和observer使用。 -
生成session密码。
-
提交session创建请求。
我们来看一下单机模式下sessionTracker.createSession(timeout)做了些什么:
public long createSession(int sessionTimeout) {
long sessionId = nextSessionId.getAndIncrement();
trackSession(sessionId, sessionTimeout);
return sessionId;
}
public synchronized boolean trackSession(long id, int sessionTimeout) {
boolean added = false;
SessionImpl session = sessionsById.get(id);
if (session == null) {
session = new SessionImpl(id, sessionTimeout);
}
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null) {
session = existedSession;
} else {
added = true;
}
updateSessionExpiry(session, sessionTimeout);
return added;
}
首先得到下一个sessionid,然后将这个sessionid纳入管理。
来看一下leader模式下sessionTracker.createSession(timeout)做了些什么:
public long createSession(int sessionTimeout) {
if (localSessionsEnabled) {
return localSessionTracker.createSession(sessionTimeout);
}
return globalSessionTracker.createSession(sessionTimeout);
}
localSessionsEnabled由配置文件决定(zoo.cfg),这个选项决定了UpgradeableSessionTracker是使用LocalSessionTracker还是SessionTrackerImpl来实现。使用LocalSessionTracker的情况下,创建session时不会扩散到整个集群,而在需要升级时,根据localSessionsUpgradingEnabled的值确定是否需要将本地session升级为全局session扩散到整个集群。
再来看一下follower或者observer模式下sessionTracker.createSession(timeout)做了些什么:
public long createSession(int sessionTimeout) {
if (localSessionsEnabled) {
return localSessionTracker.createSession(sessionTimeout);
}
return nextSessionId.getAndIncrement();
}
判断localSessionsEnabled是否开启,如果开启了,创建一个本地session,如果没有,创建一个全局session。
接下来我们看session创建请求在processor链的处理过程。
先看单机:
-
首先是PrepRequestProcessor:
int to = request.request.getInt(); request.setTxn(new CreateSessionTxn(to)); request.request.rewind(); zks.sessionTracker.trackSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner());
很简单,调用SessionTracker的trackSession方法。
-
再看FinalRequestProcessor:
applyRequest
ProcessTxnResult rc = zks.processTxn(request);
processTxn
processTxnForSessionEvents(request, hdr, request.getTxn());
processTxnForSessionEvents
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { int opCode = (request == null) ? hdr.getType() : request.type; long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.commitSession(sessionId, cst.getTimeOut()); } else if (request == null || !request.isLocalSession()) { LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } }
很简单,调用SessionTracker的commitSession方法。
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
zks.finishSessionInit给客户端返回正确的响应,包括session超时值,sessionid和session对应的密码。
再看leader:
分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。
先看开启了localSessionsEnabled的情况:
-
首先是PrepRequestProcessor:
pRequestHelper
case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break;
如果是客户端发来的请求,不做任何处理。如果是follower或者observer发来的请求,则生成hdr头和txn记录,调用SessionTracker的trackSession方法。
-
然后是ProposalRequestProcessor:
processRequest
if (shouldForwardToNextProcessor(request)) { nextProcessor.processRequest(request); } if (request.getHdr() != null) { try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); }
对于客户端发来的请求,hdr头为空,什么也不做,直接交给下一个processor处理。如果是follower或者observer发来的请求则提交提案,等待半数节点完成session创建。
-
最后是FinalRequestProcessor:
与单机模式下的情况完全相同。
再看没开启localSessionsEnabled的情况:
-
首先是PrepRequestProcessor:
pRequestHelper
case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break;
pRequest2Txn
int to = request.request.getInt(); request.setTxn(new CreateSessionTxn(to)); request.request.rewind(); zks.sessionTracker.trackSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner());
生成hdr头和txn记录,调用SessionTracker的trackSession方法。
-
然后是ProposalRequestProcessor:
processRequest
if (shouldForwardToNextProcessor(request)) { nextProcessor.processRequest(request); } if (request.getHdr() != null) { try { zks.getLeader().propose(request); } catch (XidRolloverException e) { throw new RequestProcessorException(e.getMessage(), e); } syncProcessor.processRequest(request); }
提交提案,等待半数节点完成session创建。
-
最后是FinalRequestProcessor:
与单机模式下的情况完全相同。
然后看follower:
分两种情况,开启了localSessionsEnabled和没开启localSessionsEnabled。
先看开启了localSessionsEnabled的情况:
-
首先是FollowerRequestProcessor:
processRequest
upgradeRequest = zks.checkUpgradeSession(request); if (upgradeRequest != null) { queuedRequests.add(upgradeRequest); }
如果需要升级session,发送createSession请求来升级session。
run
case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { zks.getFollower().request(request); } break;
如果是升级session的请求,则将请求转给leader处理。如果是客户端发来的请求,不做任何处理。
-
最后是FinalRequestProcessor:
与单机模式下的情况完全相同。
再看没开启localSessionsEnabled的情况:
-
首先是FollowerRequestProcessor:
run
case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { zks.getFollower().request(request); } break;
将请求转给leader处理。
-
最后是FinalRequestProcessor:
与单机模式下的情况完全相同。
observer的过程跟follower差不多,就不分析了。
大家可能会疑惑,本地session的id是不是会跟全局session的id冲突,比如客户端连接了服务器1创建了一个session,另一个客户端连接服务器2创建了一个session,这两个session的id是不是会冲突。实际上zookeeper解决这个问题的方法是每个server根据server的id不同,创建的session的起始值也不一样,所以不会冲突。这个初始值还与当前时间有关,这样来避免重启后的sessionid(重启后会从数据库快照和事物日志中重建session,也就是说session实际上也是持久化的)与重启前的sessionid冲突,代码如下:
public static long initializeNextSessionId(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
session重连
入口是:
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
LOG.warn(
"Incorrect password from {} for session 0x{}",
cnxn.getRemoteSocketAddress(),
Long.toHexString(sessionId));
finishSessionInit(cnxn, false);
}
}
逻辑很简单,检查session对应的密码是否正确,若不正确,给客户端返回错误。若正确,调用revalidateSession。
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
finishSessionInit(cnxn, rc);
}
调用sessionTracker.touchSession,根据结果给客户端返回正确或错误。touchSession返回session是否没过期,如果没过期还会更新过期时间。
session关闭:
客户端正常关闭时发送closeSession包,我们来看一下closeSession包的处理过程。
先看单机:
-
首先是PrepRequestProcessor:
if (ZooKeeperServer.isCloseSessionTxnEnabled()) { request.setTxn(new CloseSessionTxn(new ArrayList<String>(es))); } zks.sessionTracker.setSessionClosing(request.sessionId);
如果开启了zookeeper.closeSessionTxn.enabled,则设置tnx为所有需要删除的临时节点的路径,默认开启。设置该session的状态为正在关闭。
-
最后是FinalRequestProcessor
applyRequest
ProcessTxnResult rc = zks.processTxn(request); if (request.type == OpCode.closeSession && connClosedByClient(request)) { if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) { return rc; } }
ZooKeeperServer的processTnx方法
sessionTracker.removeSession(sessionId); if (txn != null) { killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId), ((CloseSessionTxn) txn).getPaths2Delete()); } else { killSession(sessionId, header.getZxid()); }
从sessionTracker中移除session,并调用killSession方法
void killSession(long session, long zxid, Set<String> paths2DeleteLocal, List<String> paths2DeleteInTxn) { if (paths2DeleteInTxn != null) { deleteNodes(session, zxid, paths2DeleteInTxn); } if (paths2DeleteLocal == null) { return; } if (paths2DeleteInTxn != null) { for (String path: paths2DeleteInTxn) { paths2DeleteLocal.remove(path); } if (!paths2DeleteLocal.isEmpty()) { LOG.warn( "Unexpected extra paths under session {} which are not in txn 0x{}", paths2DeleteLocal, Long.toHexString(zxid)); } } deleteNodes(session, zxid, paths2DeleteLocal); }
删除session关联的临时节点
processRequest
if (request.type == OpCode.closeSession) { cnxn.sendCloseSession(); }
关闭与客户端的连接。
再看其它:
其它情况与单机差不多,不同的是如果是一个本地session,closeSession的请求只需要在本地做,且不需要清理临时节点,不需要扩散到整个集群,因为如果session关联了临时节点,该session会被升级为全局session。如果是全局session,closeSession的请求需要扩散到整个集群。
session清理
入口是SessionTrackerImpl的run方法:
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
}
sessionExpiryQueue是一个ExpiryQueue,expire的间隔由zoo.cfg中的tickTime决定,默认值是3000。
ExpiryQueue的实现比较精妙,主要有三个接口方法update,getWaitTime和poll,update往队列中放,getWaitTime获取到下次执行需要等待的时间,poll获取本次需要过期的session的集合。服务器每次收到一个请求(包括心跳请求)都会调用SessionTracker的touchSession方法,touchSession会调用update方法。
public Long update(E elem, int timeout) {
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) {
return null;
}
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
首先判断新过期的时间是否与之前计算的时间相同,如果相同直接返回。从expiryMap中获取需要在newExpiryTime过期的集合,如果没有则新建一个,将元素放入该集合中,然后将元素从旧集合中移除。roundToNextInterval以expirationInterval(tickTime)向上取整,比如传入时间是1500,expirationInterval是3000,则计算出的结果是3000。
public long getWaitTime() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
return now < expirationTime ? (expirationTime - now) : 0L;
}
获取到下次过期时间的间隔。
public Set<E> poll() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
if (now < expirationTime) {
return Collections.emptySet();
}
Set<E> set = null;
long newExpirationTime = expirationTime + expirationInterval;
if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
set = expiryMap.remove(expirationTime);
}
if (set == null) {
return Collections.emptySet();
}
return set;
}
拉去下一个过期的集合,并更新nextExpirationTime。
继续看session清理,setSessionClosing(s.sessionId)将session状态置为关闭中,expirer.expire(s)将session过期的真正逻辑。我们来看expirer.expire(s)做了什么:
public void expire(Session session) {
long sessionId = session.getSessionId();
close(sessionId);
}
然后就是跟正常客户端发来的关闭session的请求一样处理。
值得注意的是,follower收到客户端的非写请求后不会转发到leader,而全局session的超时清理是由leader处理的,如果不做任何处理,客户端的非写操作(包括ping)leader都感知不到,就会导致全局session被意外清理。ZooKeeper的解决方式是LearnerSessionTracker维护了一个touchTable,在收到leader发来的PING请求时(leader发送PING请求的间隔大概是expirationInterval的一半),为touchTable的所有元素发送ping请求,然后清空touchTable。
网友评论