基于3.4.14
zookeeper提供watch机制实现,客户端对节点数据变化的事件监听。再zk实现的分布式锁,注册中心中都用到这个机制,能够很方便的实现数据的变化通知。
首先从客户端连接开始,查看zkCli.sh脚本:
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"
通过命令zkCli.sh -server 127.0.0.1:2181
实质启动的是ZooKeeperMain
的main方法,只是客户端的启动入口,整个启动过程并不复杂,可以自行查看源码,这里只讲核心部分。
启动之后实质创建了一个ZooKeeper
实例
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 启动命令发送线程,sendThread.start();以及watch事件线程,eventThread.start()
cnxn.start();
}
最终创建的是SendThread
和EventThread
。我们来看下SendThread
负责什么事情.
class SendThread extends ZooKeeperThread {
@Override
public void run() {
// ...
while (state.isAlive()) {
try {
// ...
// 发送命令packet
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
// ...
}
}
// ...
}
SendThread
循环执行发包的操作,使用的是生产者/消费者模型,我们看下doTransport
做了什么,以NIO通信为例
@Override
void doTransport(
// ...
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 这里才是执行发包的操作
doIO(pendingQueue, cnxn);
}
}
// ...
}
客户端的操作可以总结为
image.png
再看下服务端如何去处理watch任务
在之前讲过的Zookeeper的leader选举源码中,我们直到zk的启动过程,创建了NIOServerCnxnFactory
(以NIO为例)他的主要任务就是负责于客户端的通信。
看下NIOServerCnxnFactory#start
启动了线程,跟踪到最后执行的是NIOServerCnxn#doIO
->NIOServerCnxn#readPayload
->NIOServerCnxn#readRequest
->ZooKeeperServer#processPacket
->ZooKeeperServer#submitRequest
// 责任链模式的处理过程
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 执行责任链调用各个环节
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
那么这个责任链是怎么构建的呢,在芙蕖其的启动过程中,调用了NIOServerCnxnFactory#startup
方法对责任链进行设置,那么我们看下整个责任链包含的功能。
protected void setupRequestProcessors() {
// 最后一个处理器:结果的处理
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 第二个处理器:更新数据
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
// 第一个处理器:解析命令
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
以create命令为例,跟踪代码最终在FinalRequestProcessor#processRequest
->ZooKeeperServer#processTxn
->ZKDatabase#processTxn
->DataTree#processTxn
->DataTree#createNode
->dataWatches.triggerWatch
那么可以看到最终的watch事件触发就是在triggerWatch发送
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 移除监听事件
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
// 去除监听的路径
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
// 遍历通知
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//发送resp
w.process(e);
}
return watchers;
}
总结服务端的处理过程为
image.png
总结
源码部分整个过程代码较多,总结比较核心的地方看下服务端于客户端watch机制的实现。服务端为了资源的优化,设计成watch只执行一次,把是否重新watch的控制叫个客户端去处理,省去服务端管理保存watch绑定关系的而外负担。
网友评论