美文网首页Java
阿里P8带你从源码级别——深挖Zookeeper监听机制

阿里P8带你从源码级别——深挖Zookeeper监听机制

作者: 程序花生 | 来源:发表于2020-12-08 13:53 被阅读0次

    监听机制是Zookeeper的一个重要特性,例如:Zookeeper实现的高可用集群、分布式锁,就利用到了这一特性。

    在Zookeeper被监听的结点对象/信息发生了改变,就会触发监听机制,通知注册者。

    注册监听机制

    创建客户端,创建默认监听器

    在创建zookeeper客户端实例时,需要下列参数。

    new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    

    三个参数分别的含义为:

    connectString 服务端地址 sessionTimeout:超时时间 Watcher:监控器

    这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中,==在开启对某个节点或信息的监控后,但是并没有指定额外的监控器==,则会默认调用这个监控器的方法。

    对指定结点进行特殊监听处理

    除此之外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:

    getData(String path, Watcher watcher, Stat stat)
    

    Zookeeper只能在成功连接上客户端后,才能使得监控机制起作用;且仅支持4种事件的监听。

    1. 结点的增加
    2. 结点的删除
    3. 结点所携带信息的更改
    4. 结点的子结点的更改

    底层原理

    Zookeeper监听机制是观察者模式实现的。

    image

    在观察者模式中,最重要的一个属性就是需要一个列表用于保存观察者。

    而在Zookeeper监听机制中,也实现了这个一个列表,在客户端和服务端分别维护了ZKWatchManager和WatchManager。

    客户端Watch注册实现过程

    在发送一个Watch事件的会话请求时,Zookeeper客户端主要做了两件事

    • 标记该会话是一个带有 Watch 事件的请求
    • 将 Watch 事件存储到 ZKWatchManager

    以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:

    public byte[] getData(final String path, Watcher watcher, Stat stat){
        ...
        WatchRegistration wcb = null;
        // 如果watcher不为null,即有watcher对象
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
        RequestHeader h = new RequestHeader();
        // 标记请求为带有监听器的
        request.setWatch(watcher != null);
        ...
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    }
    

    之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

    public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {
        Packet packet = null;
        ...
        packet = new Packet(h, r, request, response, watchRegistration);
        ...
        outgoingQueue.add(packet); 
        ...
        return packet;
    }
    

    最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

    private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        ...
    }
    

    服务端 Watch 注册实现过程

    Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

    • 解析收到的请求是否带有 Watch 注册事件
    • 将对应的 Watch 事件存储到 WatchManager

    服务端 Watch 事件的触发过程

    以 setData 接口即“节点数据内容发生变更”事件为例。

    在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

    Set<Watcher> triggerWatch(String path, EventType type...) {
        WatchedEvent e = new WatchedEvent(type,
                                          KeeperState.SyncConnected, path);
        Set<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            ...
                for (Watcher w : watchers) {
                    Set<String> paths = watch2Paths.get(w);
                    if (paths != null) {
                        paths.remove(path);
                    }
                }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }
    

    watchers与paths的关系:

    双向绑定关系。

    由于zk的监听机制是一次性的(触发即销毁),当path2触发了监听事件后,立马从watchTable中销毁监听事件,获取watchers;并且path2结点的事件已经出发了,所以也要将每个watcher对应的paths中去除path2;然后调用watchers中每个watcher的process()函数完成一次监听回调。

    image

    客户端回调的处理过程

    SendThread

    此方法是客户端用于处理服务端的统一请求,replyHdr.getXid()值为-1时,则响应为通知类型的信息,最后调用eventThread.queueEvent()将事件交由eventThread处理。

    if (replyHdr.getXid() == -1) {
        ...
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        ...
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
                ...
                event.setPath(serverPath.substring(chrootPath.length()));
                ...
        }
        WatchedEvent we = new WatchedEvent(event);
        ...
        eventThread.queueEvent( we );
    }
    

    EventThread

    根据触发的事件类型,去监听器列表查询对应的路径所对应的监听器,并统一放到集合result中,由于Zookeeper事件是一次触发即销毁,所以也要从watchManager中移除监听器。

    public Set<Watcher> materialize(...)
    {
        Set<Watcher> result = new HashSet<Watcher>();
        ...
        switch (type) {
        ...
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        ....
        }
        return result;
    }
    

    完成了对监听器的取出后,将查询到Watcher放到对应的waitEvents任务队列中,调用 EventThread 类中的 run 方法对事件进行处理。

    而处理事件,无非就是执行我们注册事件时,写下的process()函数。

    总结

    Zookeeper的监听机制是基于观察者模式设计的。其方式就是通过在客户端和服务端都维护一张表(zkWatcherManager、watcherManager),用于存放监听器对象。

    注册监听器过程,就是在调用接口的过程,将监听器进行注册,首先在本地客户端进行一个注册管理,然后传递服务端之后,又根据是否含有监听器,在服务端进行注册管理。

    触发监听事件的过程:

    1. 服务端,通过触发的路径path,通过watcherManager找到对应的监听器集合,通过调用process()方法将信息发送至每个监听器原来的客户端;
    2. 客户端,通过判断是否是通知事件,通过zkWatcherManager找到对应的监听器集合,通过调用process()方法将执行对应的应答处理。
    image

    Watcher的客户端实现和服务端使用不同的实现

    当在监听事件触发之后,客户端和服务端几乎都做了同样的事(通过Path找到Watcher然后执行process()),但是他们做的事不同的事,服务端的Watcher的process()的作用是将path和触发的event发送至客户端,然后再次通过path和event找到watcher执行process(),这时候,执行的代码即为开发者所需要执行的==监听事件对应的应答处理process()==。

    思考 -> 为什么Zookeeper要维护两份Watcher清单(zkWatcherManager + WatcherManager)?

    使用反证法,来证明这样设计的优秀之处。

    • 假设只在客户端维护Watcher清单,当服务端的事件触发之后,服务端没有Watcher清单,不知道是哪几个客户端订阅了这个事件,只能将事件发送给所有的客户端,既浪费了带宽,也浪费了客户端处理响应的资源。
    • 假设只在服务端维护Watcher清单,当服务端的事件触发之后,服务端发送给订阅了该事件的客户端,客户端确会因为没有Watcher对象,而无法执行对应的事件应答处理,导致需要服务端将对应的处理方法,通过网络传递,则会加重网络的传输压力。

    作者:eddieVim
    链接:https://juejin.cn/post/6903693927434420232

    相关文章

      网友评论

        本文标题:阿里P8带你从源码级别——深挖Zookeeper监听机制

        本文链接:https://www.haomeiwen.com/subject/ogwfgktx.html