美文网首页
Zookeeper之JAVA API学习笔记

Zookeeper之JAVA API学习笔记

作者: 我是老薛 | 来源:发表于2018-12-07 20:52 被阅读0次

    Zookeeper提供了多种编程语言API来编写Zookeeper(后面简称zk)客户端程序,典型的如JAVA和C语言。在本文中,我们介绍如何使用Java Api。本文包含如下章节的内容:

    • 开发环境配置客户端
    • 基本流程
    • zk操作
    • 异步API
    • 小结

    参考资料:
    1、如果希望先对Zookeeper组件的基本概念有所了解,可先阅读《Zookeeper学习笔记》

    一、开发环境配置

    利用java api编写zk客户端程序,需要依赖zk的api Jar包。在本案例中要想代码成功编译需要引入zk解压目录下的 zookeeper-3.4.10.jar 。

    因为运行时依赖日志框架,还需要引入zk解压目录下lib目录下的log4j-1.2.16.jar和slf4j-api-1.6.1.jar。这样我们就可以在IDE环境中编译和运行程序了。

    运行编写的zk客户端程序之前,需要先启动zk服务,在开发阶段,可以以独立模式启动zk服务。具体可参见《Zookeeper学习笔记》

    二、基本代码流程

    zk的提供的api中,核心的是ZooKeeper类,该类的对象用于建立和维护与zk服务器的连接,并提供各种操作zk节点的方法。

    编写zk客户端程序,首先要创建ZooKeeper类的实例,ZooKeeper类有多个重构的构造函数,我们先介绍一个基本的构造函数,定义如下:

    public ZooKeeper(String connectString, int sessionTimeout, 
      Watcher watcher)throws IOException;
    

    该构造函数有3个参数,第1个参数是指定zk服务器的地址和端口信息;第2个参数是指定超时时间(单位毫秒);第3个参数是是一个Watcher对象实例。

    Watcher类也是zk提供的Java api中的一个重要类,它是一个接口,用于实现zk的观察机制。在创建ZooKeeper类的实例时,和在调用ZooKeeper类的方法时,可以为znode设置观察触发器,当znode发生变化时,zk的观察机制就会通过触发触发器让客户端得到通知。

    Watcher接口的定义如下:

    public interface Watcher {
        ....
       abstract public void process(WatchedEvent event);
    }
    

    Watcher接口中定义了一个process方法,该方法有一个参数event,通过该参数可获取zk框架回调process方法时传入的通知信息。
    event对应的WatchedEvent类中提供了几个方法可以获取具体的信息,如:
    event.getState()//获取事件状态
    event.getType() //获取事件类型
    event.getPath() //获取事件出发的znode路径

    对于调用ZooKeeper类构造函数传入的Watcher实例,有两个作用。一是当zk客户端与服务器建立链接和断开链接时,该触发器会被触发。另外是作为缺省的触发器,当znode节点被设置了观察但没有设置具体的触发器时,如果该znode节点发生变化,则该缺省触发器会被调用。这个在后面章节会有具体介绍。

    需要注意的是,通过构造函数创建ZooKeeper对象实例时,会启动一个单独的线程连接zk服务。因为调用ZooKeeper类的构造函数是立即返回的,因此,如果返回后立即使用ZooKeeper对象的方法,这时很可能zk对象还未与服务器建立连接(因为是在单独线程中去连接,肯定有时延),这样就会报错。也就是说,必须等该线程成功连接zk服务后,才能使用被创建的ZooKeeper对象。

    那问题是我们怎么知道何时成功连接zk服务器呢?这时创建ZooKeeper类实例时调用的构造函数传入的Watcher对象就发挥作用了,一旦连接成功,zk就会回调Watcher对象中的process方法,这样我们在process方法中就可以判断是否连接成功。

    因为调用ZooKeeper构造函数和process方法被执行是在两个线程中,需要利用信号量来同步两个操作。这时我们可以利用java.util.concurrent包中的CountDownLatch类。

    CountDownLatch类可用于实现同步机制,CountDownLatch对象实例中有一个计数器,当该计数器的值大于0时,调用它的await方法就会堵塞,如果该计数器的值变为0时,await方法就会返回。这样我们可以创建一个全局的CountDownLatch实例,通过构造函数设置对象计数器的初始值设为1,如下面代码:

    CountDownLatch connectedSemaphore= **new** CountDownLatch(1);
    

    然后在创建ZooKeeper对象后调用CountDownLatch的await()方法,这样该语句会被堵塞。并在Watcher的process方法中检测是否有连接成功的通知,如果有,调用CountDownLatch类的countDown()方法让计数器减1变为0,这样前面的await()语句就由堵塞变成不堵塞返回。这时就可以使用被创建的ZooKeeper对象了。

    最后我们不在使用ZooKeeper对象时,要调用ZooKeeper的close方法关闭与zk服务器的连接。

    所以使用zk提供的api编写程序的一个基本模式是:

    主程序中的伪代码:

    {
    设置CountDownLatch对象的计数器为1;
    创建ZooKeeper对象;
    调用CountDownLatch对象的await方法等到连接服务器成功;
    执行zk的操作(如创建znode节点);
    关闭连接;
    }
    

    Watcher接口的process方法中代码:

    {
    判断是否连接服务器成功;
    如果是,设置CountDownLatch对象的计数器为0
    }
    

    下面我们来看具体的一些zk操作例子。

    三、创建znode节点

    创建znode节点可使用ZooKeeper类的create方法,完整的代码例子如下:

    package com.zkexample;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    public class CreateZnode implements Watcher {
        private ZooKeeper zk;
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws Exception {
            CreateZnode createZnode = new CreateZnode();
            createZnode.connect();
            createZnode.action();
            createZnode.close();
            System.out.println("create znode success");
        }
        
        public void  connect() throws IOException, InterruptedException{
            zk = new ZooKeeper("localhost:2181",10000,this);
            connectedSemaphore.await();
        }
        
        public void action() throws KeeperException, InterruptedException{
            zk.create("/zoo", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        public void close() throws InterruptedException{
            zk.close();
        }
    
        @Override
        public void process(WatchedEvent event) {
            if(event.getState()==KeeperState.SyncConnected){
                connectedSemaphore.countDown();
            }
        }
    }
    

    下面我们来对上面的代码进行解释(部分代码的含义在上一章中已经描述,这里不再重复解释)。

    1、上面代码定义了一个普通的java类(类名为CreateZnode),实现了Watcher接口的process方法,Watcher接口的含义见上一章节中介绍。

    2、CreateZnode 类中有三个方法:
    1)connect方法用于创建ZooKeeper对象,并等待连接成功。
    2)action方法中调用了ZooKeeper类中的create方法来创建znode节点。
    3)close方法中调用了ZooKeeper类中的close方法来关闭zk客户端与zk服务器的连接。

    3、ZooKeeper类中的create方法
    create方法用于创建znode节点,有多个重载方法,上面例子中用到的方法定义如下:

    public String create(final String path, byte data[], List<ACL> acl,
                CreateMode createMode)
            throws KeeperException, InterruptedException;
    

    该方法有4个参数:
    1)第1个参数是待创建的znode的路径
    2)第2个参数是znode存储的数据,本例中传入null
    3)第3个参数是ACL列表,本例传入的常量表示该节点的权限不受控制。
    4)第4个参数是创建的节点类型,本例传入的常量表示创建的是持久节点。

    上面代码的工程只要引入适当的Jar包(见前面章节介绍),就可以直接在IDE中编译和运行。

    四、删除znode节点

    ZooKeeper类提供的delete方法可以删除指定的znode节点。delete方法定义如下:

    void delete(String path, int version) throws InterruptedException, 
       KeeperException;
    

    该方法有两个参数,第1个参数是待删除的znode的路径,第2个参数是版本号。
    1)如果version参数指定的值与待删除的znode的dataVersion不一致,则无法删除,会抛异常。可以传入值-1,则不管版本号是多少都会删除。
    2)如果待删除的znode不存在,则delete方法会抛异常。
    3)如果待删除znode下有子节点,无法删除,会抛异常

    例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

    public void action() throws KeeperException, InterruptedException{
            zk.delete("/zoo/hello", 0);
        }
    

    上面代码的含义是删除数据版本号为0的节点路径为”/zoo/hello”的znode。
    如果该znode上设置了观察,则执行delete操作后,会触发观察。

    五、设置znode存储的数据

    ZooKeeper类提供的setData方法可以给znode设置要存储的数据。setData方法定义如下:

    Stat setData(String path, byte[] data, int version) throws 
      KeeperException, InterruptedException;
    

    该方法有3个参数,第1个参数是待设置的znode的路径,第2个参数是待设置的内容,第3个参数是版本号。
    如果version参数指定的值与待设置的znode的dataVersion不一致,则无法设置,会抛异常。可以传入值-1,则不管版本号是多少都会被设置。
    如果待设置znode不存在,会抛异常。
    该方法有返回值,返回的是待设置节点的元数据信息。

    例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

    public void action() throws KeeperException, InterruptedException{
            byte[] data = "adad".getBytes();
            Stat stat = zk.setData("/zoo", data , -1);
        }
    

    如果该znode上设置了观察,则执行setData操作后,会触发观察。

    六、测试znode是否存在

    ZooKeeper类提供的exists方法可以检查指定的znode是否存在,如果不存在,返回null;如果存在返回封装有znode元数据的Stat对象。exists方法有两个重载的方法,分别定义如下:

    public Stat exists(final String path, Watcher watcher)
            throws KeeperException, InterruptedException;
    

    上面方法的第1个参数是待检查znode的路径;第2个参数是设置观察触发器,如果该节点被创建、删除或其数据被修改,设置的观察将被触发。

    public Stat exists(String path, boolean watch) throws 
            KeeperException,InterruptedException;
    

    上面方法的第1个参数是待检查znode的路径;第2个参数是个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。

    七、查看znode子节点

    ZooKeeper类提供的getChildren方法可以获取指定znode下的子节点列表的名称(注意不是绝对路径)以及返回znode的状态。getChildren方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):

    List<String> getChildren(final String path, Watcher watcher);
    
    List<String> getChildren(String path, boolean watch);
    
    List<String> getChildren(final String path, Watcher watcher,Stat stat);
    
    List<String> getChildren(String path, boolean watch, Stat stat);
    

    1)path参数是待查看的znode的路径。
    2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或znode有子节点被创建和删除,则该wathcer会被触发。
    3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
    4)stat参数是用于返回该znode的元数据信息。

    注意,如果传入的znode路径不存在,则getChildren方法会抛出异常。

    例子代码如下(与上面例子相比,改变的只是action方法,因此下面只是给出了部分代码):

    public void action() throws KeeperException, InterruptedException{
            List<String> childern = zk.getChildren("/", false);
            for(String path:childern){
                System.out.println(path);
            }
        }
    

    八、获取znode存储的数据

    ZooKeeper类提供的getData方法可以获取指定znode存储的数据以及返回znode的状态信息。getDatashuju方法有多个重构的方法,分别定义如下(省略了异常抛出的签名):

    byte[] getData(final String path, Watcher watcher, Stat stat);
    byte[] getData(String path, boolean watch, Stat stat);
    

    1)path参数是待获取的znode的路径。
    2)wathcer参数是为该znode设置观察触发器,如果该znode被删除,或其数据被更新,则该wathcer会被触发。
    3)watch参数是一个布尔值,如果为true,则为该节点设置观察,触发器为默认的触发器(即调用构造函数创建ZooKeeper对象时传入的触发器)。
    4)stat参数是用于返回该znode的元数据信息。

    注意,如果传入的znode路径不存在,则getData方法会抛出异常。

    九、异步API

    上面介绍的API都是同步的,即操作的返回结果通过API调用的返回值或参数直接返回。不过zk提供的API还提供了异步操作的API。因为所有异步操作的结果是通过回调函数来传送的,因此异步API方法的返回类型都是void,且没有异常抛出,错误信息通过回调函数的参数获取。

    下面我们通过exists方法来举例说明,异步exists方法的定义如下:

    void exists(String path, boolean watch, StatCallback cb, Object ctx);
    

    该方法有4个参数,含义如下:
    1)path参数,待操作的znode的路径
    2)watch参数,表示在节点上是否设置观察。具体含义同前面的介绍
    3)cb参数,回调函数,下面再详细介绍
    4)ctx参数,调用exists方法额外传入的参数,可用于被在回调函数中区分不同请求。

    异步方法传入的回调是StatCallback 接口的实现,StatCallback 接口的定义如下:

    interface StatCallback extends AsyncCallback {
      public void processResult(int rc, String path,    Object ctx, Stat stat);
    }
    

    processResult就是被回调的方法,该方法有4个参数,含义如下:
    1)rc参数表示zx操作出错的异常错误码,如果为0,表示操作成功,每个非0值,代表一种异常。
    2)path参数,对应调用exists方法传入的参数,可用于识别这个回调所响应的请求。
    3)ctx参数,也是对应调用exists方法传入的参数,可用于在path不足识别请求时额外传入的值,如果不需要,调用exists方法时可传入null。
    4)stat参数,查询获取的结果,类似同步exists方法的返回值。

    需要注意的是,调用异步方法时(如调用异步的exists方法),异步方法会立即返回。StatCallback 接口中的processResult方法被回调是在另外的线程中处理的。

    下面看下例子代码:

    public void action(){
            zk.exists("/zoo", false, new StatCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, Stat stat) {
                    System.out.println(stat.getCtime());
                }
            }, null);
        }
    

    十、小结

    上面我们介绍了zk提供的JAVA api,用于编写zk客户端程序。

    我们可以看出,zk客户端程序调用exists,getData,getChildren方法时可以为节点设置观察触发器。当该节点被zk客户端程序执行create,delete,setData方法时,节点上设置的触发器会被触发。

    除了上述常见的API外,还有获取和设置znode的ACL(访问控制列表)信息,这个不在文本介绍。

    相关文章

      网友评论

          本文标题:Zookeeper之JAVA API学习笔记

          本文链接:https://www.haomeiwen.com/subject/ulvxhqtx.html