概述
本节重点分析下客户端启动流程
启动流程
客户端启动流程.png1.
构造ZooKeeper,如下代码为例:
public static void main(String[] args) {
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, new DemoTest());
}
@Override
public void process(WatchedEvent event) {
System.out.println("接收到watch通知:" + event);
}
2.
构造hostProvider
// 解析服务端地址
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
- 解析
connectString
每个IP分别生成一个IPV4InetSocketAddress
一个IPV6InetSocketAddress
3.
创建ClientCnxn
// 创建ClientCnxn 重点看 getClientCnxnSocket(), connectStringParser.getChrootPath():跟路径
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
-
connectStringParser.getChrootPath
解析跟路径,例如localhost:2181/test1,根据路径为/test1,后续改客户端所有操作都基于该根路径; -
getClientCnxnSocket
反射生成ClientCnxnSocket实例,默认为ClientCnxnSocketNIO(下面也是以ClientCnxnSocketNIO为例进行分析),也可以通过zookeeper.clientCnxnSocket
进行配置;
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
// 默认0
this.sessionId = sessionId;
// 默认 new byte[16]
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
// 客户端根路径
this.chrootPath = chrootPath;
// 连接超时时间
connectTimeout = sessionTimeout / hostProvider.size();
// 响应超时时间
readTimeout = sessionTimeout * 2 / 3;
// 是否只读客户端,默认false
readOnly = canBeReadOnly;
// 创建请求处理线程
sendThread = new SendThread(clientCnxnSocket);
// 创建事件处理线程
eventThread = new EventThread();
}
4.
构造EventThread / SendThread
SendThread(ClientCnxnSocket clientCnxnSocket) {
// 线程名后缀 -SendThread
super(makeThreadName("-SendThread()"));
// 连接中
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
5.
启动启动EventThread线程,执行run方法
public void run() {
try {
isRunning = true;
while (true) {
// 从LinkedBlockingQueue中获取事件,为空时阻塞
Object event = waitingEvents.take();
// 连接关闭时默认放入 eventOfDeath
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 处理事件
processEvent(event);
}
if (wasKilled)
// 连接关闭,并且等待事件队列为空时break
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",Long.toHexString(getSessionId()));
}
-
while (true)
死循环; -
waitingEvents.take()
阻塞队列LinkedBlockingQueue waitingEvents为空阻塞;
6.
启动SendThread线程,循环第一次
执行run方法
忽略无关代码
public void run() {
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
// 未连接,SelectionKey为空
if (!clientCnxnSocket.isConnected()) {
// 建立连接
startConnect(serverAddress);
}
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
}
}
}
-
state.isAlive()
状态不是CLOSED或AUTH_FAILED一直循环;
7.
建立连接
void connect(InetSocketAddress addr) throws IOException {
// 创建SocketChannel
SocketChannel sock = createSock();
try {
// 注册事件,创建连接
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
SocketChannel createSock() throws IOException {
SocketChannel sock;
// 打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址)
sock = SocketChannel.open();
// 设置非阻塞模式
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
// 关闭Nagle算法
sock.socket().setTcpNoDelay(true);
return sock;
}
private final Selector selector = Selector.open();
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
// 注册OP_CONNECT事件到多路复用器中
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
// 异步连接服务端
boolean immediateConnect = sock.connect(addr);// return false; 非阻塞同步返回false
if (immediateConnect) {
sendThread.primeConnection();
}
}
-
SocketChannel.open()
打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址); -
sock.configureBlocking(false)
设置非阻塞模式; -
sock.socket().setTcpNoDelay(true)
关闭Nagle算法,降低小包的延迟; -
Selector.open()
创建多路复用器; -
sock.register(selector, SelectionKey.OP_CONNECT)
注册OP_CONNECT事件到多路复用器中; -
sock.connect(addr)
非阻塞模式,服务端直接返回,异步创建连接;
8.
doTransport多路复用器事件处理
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
// select没有事件产生的话阻塞waitTimeOut
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
// 从多路复用器获取事件集合
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// OP_CONNECT事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
// 处理创建连接请求
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
-
selector.select(waitTimeOut)
select没有事件产生的话阻塞waitTimeOut; -
selector.selectedKeys()
从多路复用器获取事件集合; -
sendThread.primeConnection()
如果是OP_CONNECT事件,交给sendThread处理连接请求;
9.
primeConnection处理连接请求
void primeConnection() throws IOException {
// 构造ConnectRequest
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
// 构造ConnectRequest的Packet包入队outgoingQueue
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
-
new ConnectRequest
构造ConnectRequest -
new Packet
构造ConnectRequest的Packet包,注意RequestHeader为null,后面会通过RequestHeader是否为空判断是否是ConnectRequest包; -
outgoingQueue.addFirst
Packet入队outgoingQueue;SendThread再次循环调用到doTransport
时处理这个packet;
10.
循环第二次
执行SendThread.run
此时多路复用器获取到OP_WRITE事件,执行doIO中sockKey.isWritable()逻辑,向服务端写连接请求包;
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
// 获取需要发送的 packet
Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
// lastSend = now;
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
// 生成ByteBuffer
p.createBB();
}
// write bytebuffer
sock.write(p.bb);
// 数据包已经写完
if (!p.bb.hasRemaining()) {
// 已发送数据包计数+1
sentCount++;
// outgoingQueue中移除该packet
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// packet放到已发送等待响应的队列 pedingQueue
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// outgoingQueue为空,表示没有数据包要发送:关闭写兴趣标记;后续在doIO或doTranspost等地方调用enableWrite打开;
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
// 注册OP_WRITE事件
enableWrite();
}
}
}
-
synchronized(outgoingQueue)
outgoingQueue/pendingQueue是非线程安全队列,因此直接把整个逻辑块进行同步操作; -
findSendablePacket
从outgoingQueue获取Packet; -
createBB
序列化ConnectRequest,并写入ByteBuffer; -
sock.write(p.bb)
把ByteBuffer写到服务端Socket接收缓冲区; -
outgoingQueue.removeFirstOccurrence(p)/pendingQueue.add(p)
数据包写完后!p.bb.hasRemaining()
,改Packet从待发送队列中移除,加入到已发送待响应队列; -
disableWrite/enableWrite
写出后,如果已经没有再需要写出的数据包,则停止注册OP_WRITE事件,后续等再有数据需要写时再通过enableWrite注册OP_WRITE事件;
11.
循环第三次
执行SendThread.run
此时服务端处理完ConnectRequest,多路复用器获取到OP_READ事件,执行doIO中sockKey.isReadable()逻辑,读取服务端连接响应包;
if (sockKey.isReadable()) {
// 读取服务端响应包
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId) + ", likely server has closed socket");
}
// incomingBuffer默认4字节,先读取完4字节响应头
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
// incomingBuffer == lenBuffer表示第一次读取,即读取RPC包的前4字节,即包长度
if (incomingBuffer == lenBuffer) {
recvCount++;
// incomingBuffer分配为响应体长度ByteBuffer
readLength();
} else if (!initialized) {
// 读取连接响应包
readConnectResult();
// 注册OP_READ事件
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
// initialized连接后为false,执行一次之后置为true
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
-
sock.read(incomingBuffer)
读取ByteBuffer,incomingBuffer默认4字节,先读取完4字节响应头; -
readLength()
incomingBuffer重新分配为响应体长度ByteBuffer; -
readConnectResult()
读取连接响应体,下一步详细分析; -
enableRead()
注册OP_READ事件; -
initialized = true
默认以及每次连接后置为false,执行一次之后即建立连接之后置为true,以后都执行sendThread.readResponse(incomingBuffer)
逻辑;
12.
readConnectResult读取连接响应体
void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
// 反序列化ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
// 建立连接后处理,主要处理回调
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
-
deserialize
响应体报文反序列化为ConnectResponse; -
sendThread.onConnected
建立连接后处理,主要处理回调;
13.
queueEvent构造WatcherSetEventPair入队waitingEvents
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
-
new WatcherSetEventPair
构造WatcherSetEventPair(watcher.materialize下一步分析); -
waitingEvents.add(pair)
入队waitingEvents(LinkedBlockingQueue阻塞队列线程安全不需要同步操作);
14.
materialize构造Watcher集合
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
......
}
-
result.add(defaultWatcher)
构造ZooKeeper时watcher(ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000,new DemoTest())
;) -
dataWatches/existWatches/childWatches
此时启动时都为空;
14.
EventThread.run阻塞结束,处理WatcherSetEventPair事件
private void processEvent(Object event) {
......
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// 回调watcher.process方法
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
......
}
-
pair.watchers
遍历处理watcher回调; -
watcher.process(pair.event)
回调watcher.process方法,即示例代码中process方法;
-----over-----
网友评论