监听机制是Zookeeper的一个重要特性,例如:Zookeeper实现的高可用集群、分布式锁,就利用到了这一特性。
在Zookeeper被监听的结点对象/信息发生了改变,就会触发监听机制,通知注册者。
注册监听机制
创建客户端,创建默认监听器
在创建zookeeper客户端实例时,需要下列参数。
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
三个参数分别的含义为:
connectString 服务端地址 sessionTimeout:超时时间 Watcher:监控器
这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中,==在开启对某个节点或信息的监控后,但是并没有指定额外的监控器==,则会默认调用这个监控器的方法。
对指定结点进行特殊监听处理
除此之外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:
getData(String path, Watcher watcher, Stat stat)
Zookeeper只能在成功连接上客户端后,才能使得监控机制起作用;且仅支持4种事件的监听。
- 结点的增加
- 结点的删除
- 结点所携带信息的更改
- 结点的子结点的更改
底层原理
Zookeeper监听机制是观察者模式实现的。
image在观察者模式中,最重要的一个属性就是需要一个列表用于保存观察者。
而在Zookeeper监听机制中,也实现了这个一个列表,在客户端和服务端分别维护了ZKWatchManager和WatchManager。
客户端Watch注册实现过程
在发送一个Watch事件的会话请求时,Zookeeper客户端主要做了两件事
- 标记该会话是一个带有 Watch 事件的请求
- 将 Watch 事件存储到 ZKWatchManager
以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:
public byte[] getData(final String path, Watcher watcher, Stat stat){
...
WatchRegistration wcb = null;
// 如果watcher不为null,即有watcher对象
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
RequestHeader h = new RequestHeader();
// 标记请求为带有监听器的
request.setWatch(watcher != null);
...
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:
public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {
Packet packet = null;
...
packet = new Packet(h, r, request, response, watchRegistration);
...
outgoingQueue.add(packet);
...
return packet;
}
最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
...
}
服务端 Watch 注册实现过程
Zookeeper 服务端处理 Watch 事件基本有 2 个过程:
- 解析收到的请求是否带有 Watch 注册事件
- 将对应的 Watch 事件存储到 WatchManager
服务端 Watch 事件的触发过程
以 setData 接口即“节点数据内容发生变更”事件为例。
在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。
Set<Watcher> triggerWatch(String path, EventType type...) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
...
for (Watcher w : watchers) {
Set<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
watchers与paths的关系:
双向绑定关系。
由于zk的监听机制是一次性的(触发即销毁),当path2触发了监听事件后,立马从watchTable中销毁监听事件,获取watchers;并且path2结点的事件已经出发了,所以也要将每个watcher对应的paths中去除path2;然后调用watchers中每个watcher的process()函数完成一次监听回调。
image客户端回调的处理过程
SendThread
此方法是客户端用于处理服务端的统一请求,replyHdr.getXid()值为-1时,则响应为通知类型的信息,最后调用eventThread.queueEvent()将事件交由eventThread处理。
if (replyHdr.getXid() == -1) {
...
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
...
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
...
event.setPath(serverPath.substring(chrootPath.length()));
...
}
WatchedEvent we = new WatchedEvent(event);
...
eventThread.queueEvent( we );
}
EventThread
根据触发的事件类型,去监听器列表查询对应的路径所对应的监听器,并统一放到集合result中,由于Zookeeper事件是一次触发即销毁,所以也要从watchManager中移除监听器。
public Set<Watcher> materialize(...)
{
Set<Watcher> result = new HashSet<Watcher>();
...
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
....
}
return result;
}
完成了对监听器的取出后,将查询到Watcher放到对应的waitEvents任务队列中,调用 EventThread 类中的 run 方法对事件进行处理。
而处理事件,无非就是执行我们注册事件时,写下的process()函数。
总结
Zookeeper的监听机制是基于观察者模式设计的。其方式就是通过在客户端和服务端都维护一张表(zkWatcherManager、watcherManager),用于存放监听器对象。
注册监听器过程,就是在调用接口的过程,将监听器进行注册,首先在本地客户端进行一个注册管理,然后传递服务端之后,又根据是否含有监听器,在服务端进行注册管理。
触发监听事件的过程:
- 服务端,通过触发的路径path,通过watcherManager找到对应的监听器集合,通过调用process()方法将信息发送至每个监听器原来的客户端;
- 客户端,通过判断是否是通知事件,通过zkWatcherManager找到对应的监听器集合,通过调用process()方法将执行对应的应答处理。
Watcher的客户端实现和服务端使用不同的实现
当在监听事件触发之后,客户端和服务端几乎都做了同样的事(通过Path找到Watcher然后执行process()),但是他们做的事不同的事,服务端的Watcher的process()的作用是将path和触发的event发送至客户端,然后再次通过path和event找到watcher执行process(),这时候,执行的代码即为开发者所需要执行的==监听事件对应的应答处理process()==。
思考 -> 为什么Zookeeper要维护两份Watcher清单(zkWatcherManager + WatcherManager)?
使用反证法,来证明这样设计的优秀之处。
- 假设只在客户端维护Watcher清单,当服务端的事件触发之后,服务端没有Watcher清单,不知道是哪几个客户端订阅了这个事件,只能将事件发送给所有的客户端,既浪费了带宽,也浪费了客户端处理响应的资源。
- 假设只在服务端维护Watcher清单,当服务端的事件触发之后,服务端发送给订阅了该事件的客户端,客户端确会因为没有Watcher对象,而无法执行对应的事件应答处理,导致需要服务端将对应的处理方法,通过网络传递,则会加重网络的传输压力。
作者:eddieVim
链接:https://juejin.cn/post/6903693927434420232
网友评论