ZooKeeper
是客户端库的主要类。要使用ZooKeeper服务,应用程序必须首先实例化ZooKeeper类的对象。所有后续操作都将通过调用ZooKeeper类的方法来完成。如果没有特别说明,则此类的方法是线程安全的。
建立与服务器的连接后,会为客户端分配会话ID(sessionId)。客户端将定期向服务器发送心跳以保持会话有效。
只要客户端的会话ID保持有效,应用程序就可以通过客户端调用ZooKeeper API。
如果由于某种原因,客户端无法长时间向服务器发送心跳(例如,超过sessionTimeout值),则服务器将认为会话到期,并且会话ID将会失效。客户端对象将不再可用。如果要进行ZooKeeper API调用,应用程序必须创建一个新的客户端对象。
如果客户端当前连接的ZooKeeper服务器出现故障或者没有响应,则客户端将在其会话ID到期之前自动尝试连接到另一台服务器。如果成功,应用程序可以继续使用客户端。
ZooKeeper API方法可以是同步或者异步调用。同步方法会阻塞,直到服务器响应为止。异步方法只是将发送请求排队并立即返回。它们采用一个回调对象,该回调对象将在成功执行请求时执行,或者在错误时执行,并返回指示错误的返回代码(return code rc)。
ZooKeeper的一些API调用成功后,可以在ZooKeeper服务器中的“数据节点”上注册一个Watch监听。其他的一些ZooKeeper API调用修改了数据节点可以触发这些watches。一旦一个watche被触发,向服务端数据节点注册监听的客户端将在第一时间收到一个Event事件。每个Watch只能被触发一次。因此,客户端每注册一个监听,最多会有一个Event事件被传递给客户。
客户端需要实现Watcher接口的类的对象来处理传递给客户端的事件。当客户端丢弃当前连接并重新连接到服务器时,所有现有监视都被视为已触发,但未传递的事件将丢失。为了模拟这一点,客户端将生成一个特殊事件来告诉事件处理程序已删除连接。此特殊事件的类型为EventNone,状态为sKeeperStateDisconnected。
构造方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
-
watchManager:事件监听管理器
protected ZKWatchManager defaultWatchManager() { return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)); }
-
hostProvider:服务端连接负载均衡,当重连服务端时调用其
next
方法,获得连接的服务端地址,默认轮询private static HostProvider createDefaultHostProvider(String connectString) { return new StaticHostProvider( new ConnectStringParser(connectString).getServerAddresses()); }
-
cnxn:ClientCnxn实例,处理I/O的核心
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); cnxn.seenRwServerBefore = true; // since user has provided sessionId cnxn.start();
ClientCnxn的构造方法中
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); this.clientConfig=zooKeeper.getClientConfig(); }
设置参数如zooKeeper、watcher、sessionTimeout、chrootPath,启动两个线程
- sendThread:IO线程,从队列中发送数据
- eventThread:事件线程,处理接收到的服务端发送的事件
此外还有两个队列
/** * 这些是已经发送并正在等待响应的数据包。 */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * 这些是需要发送的数据包。 */ private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
zk请求和响应的最终载体是Packet,该对象包含请求和响应需要的所有数据。此外客户端通信时,sendThread从outgoingQueue中取出Packet发送出去后放入pendingQueue等待响应。
start方法,启动线程
public void start() { sendThread.start(); eventThread.start(); }
getData
同步和异步两种
byte[] getData(final String path, Watcher watcher, Stat stat)
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
同步getData
public byte[] getData(final String path, Watcher watcher, Stat stat) {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// 1
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
// 2
final String serverPath = prependChroot(clientPath);
// 3
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 4
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
// 5
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
// 6
return response.getData();
}
- 监听器
- 带上根路径,在解析服务端连接时会解析出根路径
- 创建请求
- 提交请求,同步操作,等待结果返回
- 将响应的Stat数据copy到参数stat
- 返回
所以重点在于提交请求
提交请求
提交请求的参数中,有请求头、请求体、响应体、监听器,根据参数可以判断请求处理以及监听器注册都在这一步
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Xid会在发送请求时创建
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 1
outgoingQueue.add(packet);
}
}
// 2
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
- 加入发送队列
- 唤醒发送线程,sendThread是核心IO线程
SendThread IO
发送数据
SendThread run方法执行消息发送,核心逻辑
-
是否已连接,若无,则连接
if (!clientCnxnSocket.isConnected()) { startConnect();//连接 clientCnxnSocket.updateLastSendAndHeard(); }
-
已连接,是否需要发送心跳
if (state.isConnected()) { int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); // 心跳 clientCnxnSocket.updateLastSend();//更新心跳时间 } else { if (timeToNextPing < to) { to = timeToNextPing; } } }
-
发送数据
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
ClientCnxnSocket的实现
- ClientCnxnSocketNIO:默认使用
- ClientCnxnSocketNetty
在ClientCnxnSocketNIO的是实现中doTransport方法就是常规的NiO处理
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
核心在于doIO方法,进入doIO方法的条件是可读或者可写,也就是该方法处理读写事件,读事件需要读取pendingQueue数据,先看写事件
写
if (sockKey.isWritable()) {
// 1
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
//2
p.createBB();
}
//3
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// 4
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// 不再发送数据包:关闭写入兴趣标志
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// 在初始连接时,写入完整的连接请求数据包,然后禁用进一步写入,直到收到成功的连接响应。
// 如果会话过期,则服务器发送到期响应并立即关闭其套接字的结尾。
// 如果客户端同时在其端写入,则TCP堆栈可以选择使用RST中止,在这种情况下,
// 客户端将永远不会收到会话过期事件。
disableWrite();
} else {
// Just in case
enableWrite();
}
}
- 拿到一个Packet
- 创建buffer,将请求头、请求体写入buffer,注:对于Watch只写入bool
a_.writeBool(watch,"watch");
- 向socket写入数据
- 将packet放到pendingQueue
读:
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("。。。");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
// 1
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//2
sendThread.readResponse(incomingBuffer);
//3
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
- 读数据先读4个字节的包长度,然后根据读取到的长度重新分配incomingBuffer,解决拆包粘包问题
- 读取响应到incomingBuffer,并处理
- 清除lenBuffer,同时将lenBuffer赋值给incomingBuffer用于下次读取长度
读响应并处理
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
//1
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
//2
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
// 事件,反序列化
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
//log
}
}
WatchedEvent we = new WatchedEvent(event);
// 3 事件入列
eventThread.queueEvent( we );
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*由于请求按顺序处理,我们正好得到对第一个请求的响应!*/
// 4
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("...");
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
}
} finally {
// 5
finishPacket(packet);
}
}
- 反序列化响应头
- 捕获异常,将异常转化为事件,交给eventThread处理
- 响应为事件类型,交给eventThread处理,将触发监听器
- 反序列化相应体
- 注册监听器以及其他对响应数据的处理
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
//1
p.watchRegistration.register(err);
}
if (p.watchDeregistration != null) {
// 将所有已删除的监视事件添加到事件队列,
// 以便通过“Data / Child WatchRemoved”事件类型通知客户端。
// 。。。
}
// 2
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
- 注册监听器
- 根据是否有回调函数,也就是请求是否是异步,将请求交由异步线程EventThread处理
注册监听器
调用的是watchRegistration的register方法
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
也就是将watcher添加到watchers里,其getWatches方法是抽象方法。由于watchRegistration实例是在创建Packet包的时候创建的,如getData方法
wcb = new DataWatchRegistration(watcher, clientPath);
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
也就是取watchManager的dataWatches,在watchManager中,有多个watcher map,将不同类型的watcher添加到不同的map保存。
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
回调事件监听器
在前面读取响应的时候判断响应是否为事件类型,将调用EventThread的queueEvent方法,将事件添加到waitingEvents中,在EventThread进行处理,此外,前面finishPacket方法提到过,zk的异步api调用会调用queuePacket方法,将Packet交由,EventThread回调处理。
private void queueEvent(WatchedEvent event,
Set<Watcher> materializedWatchers) {
...
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// 找到对应的watchers
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
waitingEvents.add(pair);
}
public void queuePacket(Packet packet) {
...
waitingEvents.add(packet);
}
public void queueCallback(AsyncCallback cb, int rc, String path,
Object ctx) {
waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
}
waitingEvents保存3种类型数据
- WatcherSetEventPair:服务端发送的事件,将回调本地watchers
- Packet:异步调用的api,需要回调处理
- LocalCallback:(暂时没用到)
重点关注查找watchers,materialize方法属于ZooKeeper实例
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
。。。
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
。。。
}
return result;
}
注:addTo(dataWatches.remove(clientPath), result);
,此处调用的是remove,说明针对某个路径的watcher只会被触发一次。对clientPath的watchers添加到result后返回。
入列后自然需要在run方法中处理队列,处理核心在于processEvent方法
Object event = waitingEvents.take();
processEvent(event);
- 取队列数据
- 处理
processEvent方法的核心逻辑
if (event instanceof WatcherSetEventPair) {//WatcherSetEventPair
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
watcher.process(pair.event);
}
} else if (event instanceof LocalCallback) {//LocalCallback
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null);
} else if (lcb.cb instanceof ...) {
//其他类型
}
} else {// Packet 异步
Packet p = (Packet) event;
// 处理RC以及根据类型进行回调
。。。
}
到这里已经看到了,针对不同的类型分别处理。
网友评论