zookeeper客户端和服务端维持一个TCP长连接,它们之间任何正常的通信都需要一个正常的会话。本文主要分析会话生命周期中会话状态的变化过程和客户端服务端如何管理会话。
客户端记录的会话状态有:
ZooKeeper.States
public enum States {
//第一次会话正在创建的状态
CONNECTING,
//没有使用
ASSOCIATING,
//连接建立完成的状态
CONNECTED,
//只读模式下,连接建立完成的状态
CONNECTEDREADONLY,
//会话关闭状态,包括客户端主动关闭或者会话超时
CLOSED,
//授权失败,未通过Sasl认证
AUTH_FAILED,
//会话还未创建时的初始化状态
NOT_CONNECTED;
}
原生客户端会话状态变化时的触发事件有:
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
//客户端发现与服务端断开连接时,会马上尝试重连并触发该事件
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
//非只读模式下,每次客户端刚连接上服务端的时候会触发该事件
SyncConnected (3),
//如果有权限验证的话,验证失败触发该事件
AuthFailed (4),
//只读模式下,每次客户端刚连接上服务端的时候会触发该事件
ConnectedReadOnly (5),
//如果有权限验证的话,验证成功触发该事件
SaslAuthenticated(6),
//当客户端与服务端重新通信,服务端认为会话已超时,发送会话过期响应,触发该事件
Expired (-112);
}
可以看出,会话的状态主要包括CONNECTING
(创建过程中),CONNECTED
(创建完成),CLOSED
(关闭会话)这三个状态,其中状态的改变也会触发对应的事件方便通知对应的事件监听者。
在介绍具体的会话状态变化前,先看下会话状态变更流程图:
会话创建
一次会话的创建过程中我们分析了会话的完整创建过程,此处聚焦会话的状态变化和对应触发事件。
客户端处理
通过ClientCnxn.sendThread
线程处理与服务端的连接和IO过程,第一次连接时调用
SendThread.startConnect
private void startConnect() throws IOException {
//1
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
//2
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostString());
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
//3
clientCnxnSocket.connect(addr);
}
主要流程为:
1.连接状态设置为States.CONNECTING
2.从服务器列表中选取一个服务器地址,如果需要授权校验,则进行校验
3.尝试与服务器连接,如果连接上了,客户端会发送创建会话的第一个请求,SendThread等待服务端的响应。线程中处理IO的方法为clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
ClientCnxnSocket
是和服务端底层通信的实现,它调用ClientCnxnSocketNIO.doIO
处理读写事件
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sockKey.isReadable()) {
if (!initialized) {
readConnectResult();
initialized = true;
}
··········省略不相关代码·············
}
第一次读到服务端的响应数据时,会调用readConnectResult
void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
主要是反序列化解析响应并调用sendThread.onConnected
进行会话完成的回调处理
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
throw new SessionExpiredException(
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
主要流程为:
1.如果服务端返回的会话过期时间小于等于0,说明会话已经过期,将会话状态设置为States.CLOSED
,并触发Expired
事件
2.根据响应内容设置客户端会话相关属性,如readTimeout
, ·connectTimeout,
sessionId`等
3.根据客户端是否只读,设置会话状态和触发事件
- 如果是只读客户端,将会话状态设置为
States. CONNECTEDREADONLY
,并触发ConnectedReadOnly
事件 - 如果不是只读客户端,将会话状态设置为
States. CONNECTED
,并触发SyncConnected
事件
服务端处理
因为服务端通过会话管理器来管理会话,所以先介绍下会话管理器的内容。
服务端初始化时会初始化自己的会话管理器SessionTracker sessionTracker
,Leader服务器的实现为:LeaderSessionTracker
,Follower和Observer服务器的实现为LearnerSessionTracker
LeaderSessionTracker
:负责所有会话激活,会话超时检查,会话清理。
public class LeaderSessionTracker extends UpgradeableSessionTracker {
//是否为本地session,该值一般为false
private final boolean localSessionsEnabled;
//全局会话管理器
private final SessionTrackerImpl globalSessionTracker;
/**
* Server id of the leader
*/
private final long serverId;
public LeaderSessionTracker(SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeouts,
int tickTime, long id, boolean localSessionsEnabled,
ZooKeeperServerListener listener) {
this.globalSessionTracker = new SessionTrackerImpl(
expirer, sessionsWithTimeouts, tickTime, id, listener);
this.localSessionsEnabled = localSessionsEnabled;
if (this.localSessionsEnabled) {
createLocalSessionTracker(expirer, tickTime, id, listener);
}
serverId = id;
}
··········
}
主要成员变量为SessionTrackerImpl globalSessionTracker
,是全局会话管理的实现。它的主要成员变量为:
//key:sessionId value:session实体 ,每个会话都会保存其中
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();
//key:sessionId value:会话超时时间 ,该数据结构与内存数据库相连通,会被定期持久化到快照文件中
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
//将会话按照各自的过期时间(优化为心跳时间的整数倍)分桶存放,可快速用于会话的超时校验
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
//当前服务器创建会话的最新sessionId
private final AtomicLong nextSessionId = new AtomicLong();
//会话超时清理器
private final SessionExpirer expirer;
//保存本地会话信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地会话相关
protected LocalSessionTracker localSessionTracker;
其中sessionExpiryQueue
按照每个会话的过期时间分桶管理会话。
ExpiryQueue
//key:每一个session实体,value:最近一次会话激活时计算出来的过期时间点
private final ConcurrentHashMap<E, Long> elemMap =
new ConcurrentHashMap<E, Long>();
//key:过期时间点,value:在这个时间点过期的会话集合
private final ConcurrentHashMap<Long, Set<E>> expiryMap =
new ConcurrentHashMap<Long, Set<E>>();
//expirer的下一个会话过期检查时间
private final AtomicLong nextExpirationTime = new AtomicLong();
//心跳时间
private final int expirationInterval;
public ExpiryQueue(int expirationInterval) {
this.expirationInterval = expirationInterval;
nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}
//计算过期时间
private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
所有会话都被按照各自的过期时间点分批放在expiryMap
中,正常来说会话的过期时间点应该为:会话创建时间(当前时间) + 会话的超市时间
,但是每个会话的创建时间是很随机的,服务端不可能时时刻刻检查每一个会话是否过期了。心跳时间就是大体保证服务端定期检查会话的时间间隔。如果将会话管理器的定期检查会话的时间点
和会话的过期时间点
都转化为心跳时间的整数倍,那么就比较好管理会话。
通过roundToNextInterval
方法将这些时间化为心跳时间的整数倍。
会话的分桶管理示意图为:
globalSessionTracker线程会不断从sessionExpiryQueue中获取下一个过期时间点nextExpirationTime
已经超时的会话,调用expirer.expire
进行会话清理。
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
LearnerSessionTracker
:一是保存当前follower或observer服务器的会话,当leader服务器发送服务间心跳时,会把当前所有会话响应给leader,用于会话激活。二是如果会话是本地会话,当遇到必须升级为全局会话的情况,需要从LearnerSessionTracker取出会话交给leader创建全局会话。
主要成员变量为:
private final SessionExpirer expirer;
// key:sessionId, value:sessionTimeout 用于将会话交给leader激活
private final AtomicReference<Map<Long, Integer>> touchTable =
new AtomicReference<Map<Long, Integer>>();
private final long serverId;
//当前服务器创建会话的最新sessionId
private final AtomicLong nextSessionId = new AtomicLong();
//是否可创建本地会话,一般为false
private final boolean localSessionsEnabled;
//全局会话,和定时快照有关
private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
//保存本地会话信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地会话相关
protected LocalSessionTracker localSessionTracker;
对于服务端来说,一个会话创建,会话信息会保存在leader服务器globalSessionTracker中的globalSessionTrackersessionsById , sessionsWithTimeout , sessionExpiryQueue
中。同时也会保存在learner服务器LearnerSessionTracker中的touchTable,globalSessionsWithTimeouts
中。所发送的响应数据为:
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
也就是会把会话的sessionTimeout,sessionId,sessionPasswd信息发送给客户端。
心跳维持
为了保持客户端会话的有效性,客户端在会话超时时间内会向服务端发送PING请求来保持有效性。服务端接收到PING请求后会重新计算当前会话的过期时间,激活会话。
客户端处理
客户端主动发送PING的逻辑在sendThread.run
中
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
············省略无关代码········
}
其中timeToNextPing
为下次发送PING的剩余时间,创建完成会话时会把readTimeout
设置为会话超时时间的2/3,clientCnxnSocket.getIdleSend
是指距离最后一次发送数据的时间间隔。所以客户端主动PING的时间间隔大体为1/3的会话时间,此外如果超过了MAX_SEND_PING_INTERVAL (10s)
客户端没有主动和服务端通信,也会发送PING
服务端处理
对于服务端来说,不管是客户端的主动PING还是其他类型的通信都会激活会话。
如果客户端是与Leader服务器建立的会话,Leader服务器激活会话的流程图为:
激活会话的过程在
SessionTrackerImpl.touchSession
synchronized public boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
//1
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
//2
updateSessionExpiry(s, timeout);
return true;
}
1.如果会话已经关闭,说明此时会话已经超时了,将不在激活,忽略掉这个客户端请求。
2.激活会话
updateSessionExpiry
private void updateSessionExpiry(SessionImpl s, int timeout) {
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
}
主要调用sessionExpiryQueue.update
public Long update(E elem, int timeout) {
//1
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
//2
// First add the elem to the new expiry time bucket in expiryMap.
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(
new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);
//3
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
1.重新计算该会话的新的超时时间点,如果和当前的超时时间点一样,直接返回
2.将会话加入到新的超时时间点集合中,并从原超时时间点集合中删除
示意图为:
如果客户端是与Learner服务器建立的会话,首先会调用
LearnerSessionTracker.touchSession
public boolean touchSession(long sessionId, int sessionTimeout) {
touchTable.get().put(sessionId, sessionTimeout);
return true;
}
使得会话一直保存在touchTable
中
其次,通过leader服务器与Learner服务器的定期心跳来完成Learner服务器上会话在leader服务器的激活。
先看下服务器间的心跳维持:
1.leader服务器主动发送PING:
leader.lead
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
··············省略无关代码··········
}
可以看到在1/2心跳时间间隔内,会主动发送PING给 learner服务器。
2.learner服务器接收PING请求之后的处理
Learner.ping
protected void ping(QuorumPacket qp) throws IOException {
// Send back the ping with our session data
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
Map<Long, Integer> touchTable = zk.getTouchSnapshot();
//
for (Entry<Long, Integer> entry : touchTable.entrySet()) {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
qp.setData(bos.toByteArray());
writePacket(qp, true);
}
可以看到会把touchTable
发送给leader服务器,当leader接收到来自follower的PING响应时,会遍历touchTable
中的session,调用LearnerSessionTracker.touchSession
来激活这些会话。通过服务集群之间的心跳来激活learner服务器上的会话,是很巧妙的一种方式。
只要客户端能够在指定时间内发送数据,服务端能够顺利激活会话,会话的连接状态就一直是States. CONNECTED
会话连接断开之后的重连
可能由于网络不稳定等原因导致网络连接断开,主要讨论会话超时时间内的连接断开重连
和会话超时之后的重连
这两种情况。
会话超时时间内的socket连接断开
客户端处理
1.断开处理
当客户端sendThread进行IO操作出现可确定连接异常时调用cleanup
方法
private void cleanup() {
//1
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
//2
conLossPacket(p);
}
pendingQueue.clear();
}
// We can't call outgoingQueue.clear() here because
// between iterating and clear up there might be new
// packets added in queuePacket().
//3
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
conLossPacket(p);
iter.remove();
}
}
1.底层socket的处理clientCnxnSocket.cleanup
,关闭掉当前socket,并注销SelectionKey sockKey
,这样sendThread便可知道连接断开,需要进行重连了
2.通知等待请求队列和发送请求队列连接已断开
conLossPacket
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
当连接断开刚断开时,会设置请求的响应头err信息为KeeperException.Code.CONNECTIONLOSS
,在后续的请求响应处理中finishPacket
会根据该err信息通知该请求路径上关联的所有watchers,发生了连接断开事件。
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
//查出路径上注册的所有watchers
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
//触发连接断开事件
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
}
queueEvent
oid queueEvent(String clientPath, int err,
Set<Watcher> materializedWatchers, EventType eventType) {
KeeperState sessionState = KeeperState.SyncConnected;
if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
|| KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
sessionState = Event.KeeperState.Disconnected;
}
WatchedEvent event = new WatchedEvent(eventType, sessionState,
clientPath);
eventThread.queueEvent(event, materializedWatchers);
}
可以看出会发送Event.KeeperState.Disconnected
事件
3.迭代outgoingQueue
发送队列中的请求,通知新加入的请求连接断开
4.此外,如果确定连接已断开,再往发送队列发送数据时也会调用conLossPacket
通知请求连接断开
发送数据
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
//通知请求已断开
conLossPacket(packet);
}
}
2.客户端重连
sendThread线程发现客户端连接断开了,会选择下一个服务器地址,进行重连,此时会带上sessionId
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
}
3.接收会话创建成功响应
同第一次会话创建,将会话状态设置为States. CONNECTED,并触发SyncConnected事件
服务端处理
1.断开处理
服务端发现无法与客户端的ServerCnxn
通信时,NIOServerCnxn.doIO
会catch住异常,调用NIOServerCnxn.close
从cnxns列表中移除ServerCnxn,并关闭当前连接
/**
* Close the cnxn and remove it from the factory cnxns list.
*/
@Override
public void close() {
if (!factory.removeCnxn(this)) {
return;
}
if (zkServer != null) {
zkServer.removeCnxn(this);
}
if (sk != null) {
try {
// need to cancel this selection key from the selector
sk.cancel();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ignoring exception during selectionkey cancel", e);
}
}
}
closeSock();
}
2.收到客户端的重连创建会话请求
一般客户端会选择另外一台服务端发送会话创建请求,当服务器在本地session校验通过后,便会激活会话,创建与客户端的socket连接。
处理连接请求:
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
long sessionId = connReq.getSessionId();
if (sessionId == 0) {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
} else {
//sessionId不为0,表示重连
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+ " for session 0x" + Long.toHexString(sessionId));
finishSessionInit(cnxn, false);
}
}
3.发送会话创建成功响应数据:
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
会话超时之后的重连
服务端处理
1.会话清理
Leader服务器的会话管理器线程会检查出过期的会话,进行会话清理。清理操作为:
SessionTrackerImpl.run
for (SessionImpl s : sessionExpiryQueue.poll()) {
//1
setSessionClosing(s.sessionId);
expirer.expire(s);
}
expirer.expire
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
//2
close(sessionId);
}
private void close(long sessionId) {
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
setLocalSessionFlag(si);
submitRequest(si);
}
- 标记会话状态为已关闭,
s.isClosing = true
- 发起
OpCode.closeSession
事务请求,主要做的事情是- 删除与会话相关的临时节点
包括即将会被创建但为保存到内存数据库中的临时节点。 - 移除会话
主要从服务器各自的SessionTracker中移除 - 关闭NIOServerCnxn
- 删除与会话相关的临时节点
2.会话过期响应
此时服务端已经没有了当前会话的sesionId,校验session revalidateSession
时无法重新激活会话
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//返回false
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
finishSessionInit(cnxn, rc);
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
else {
LOG.info("Invalid session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " for client "
+ cnxn.getRemoteSocketAddress()
+ ", probably expired");
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
}
此时会发送ServerCnxnFactory.closeConn
给客户端,使其关闭连接。
/**
* The buffer will cause the connection to be close when we do a send.
*/
static final ByteBuffer closeConn = ByteBuffer.allocate(0);
客户端处理
1.重连
这种场景表示在会话超时时间之后客户端才发送创建会话的重连请求到服务端。
2.接收关闭响应
因为此时收到的响应没有negotiatedSessionTimeout,所以会将连接状态设置为States.CLOSED
,并发送KeeperState.Expired
事件,通知所有watcher。同时等待eventThread处理完所有事件,将线程状态标记为isRunning = false
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
网友评论