美文网首页
使用ZooKeeper提供的原生Java API操作ZooKee

使用ZooKeeper提供的原生Java API操作ZooKee

作者: 端碗吹水 | 来源:发表于2019-11-17 17:17 被阅读0次

    建立客户端与zk服务端的连接

    我们先来创建一个普通的maven工程,然后在pom.xml文件中配置zookeeper依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>
    </dependencies>
    

    在resources目录下创建一个zk-connect.properties属性配置文件,我们在该文件中填写连接zookeeper服务器的一些配置信息。如下:

    # zk.zkServerIp=192.168.190.129:2181  单机模式
    zk.zkServerIps=192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181
    zk.timeout=5000
    

    注:我这里使用的集群模式,所以是多个IP。

    zookeeper使用的是log4j作为日志打印工具,所以我们还需要在resources目录下创建log4j的

    log4j.rootLogger=WARN,console
    
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.encoding=UTF-8
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n
    

    然后创建一个连接类demo,代码如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.*;
    import java.util.Properties;
    
    /**
     * @Description: zookeeper 连接demo演示
     */
    public class ZKConnect implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZKConnect.class);
    
        // private static String zkServerIp;  单机模式是一个ip
    
        // 集群模式则是多个ip
        private static String zkServerIps;
    
        // 连接超时时间
        private static Integer timeout;
    
        // 加载配置信息
        static {
            Properties properties = new Properties();
            InputStream inputStream = Object.class.getResourceAsStream("/zk-connect.properties");
            try {
                properties.load(inputStream);
    
                // zkServerIp = properties.getProperty("zk.zkServerIp");
                zkServerIps = properties.getProperty("zk.zkServerIps");
                timeout = Integer.parseInt(properties.getProperty("zk.timeout"));
            } catch (Exception e) {
                logger.error("配置文件读取异常", e);
            } finally {
                try {
                    if (inputStream != null) {
                        inputStream.close();
                    }
                } catch (IOException e) {
                    logger.error("关闭流失败", e);
                }
            }
        }
    
        // Watch事件通知
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            /**
             * 客户端和zk服务端链接是一个异步的过程
             * 当连接成功后后,客户端会收的一个watch通知
             *
             * 参数:
             * connectString:连接服务器的ip字符串,
             *      比如: "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181"
             *      可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
             *      也可以在ip后加路径
             * sessionTimeout:超时时间,心跳收不到了,那就超时
             * watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
             * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
             *                         此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
             * sessionId:会话的id
             * sessionPasswd:会话密码   当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
             */
    
            // 实例化zookeeper客户端
            ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnect());
    
            logger.warn("客户端开始连接zookeeper服务器...");
            logger.warn("连接状态:{}", zooKeeper.getState());
    
            // 避免发出连接请求就断开,不然就无法正常连接也无法获取watch事件的通知
            Thread.sleep(2000);
    
            logger.warn("连接状态:{}", zooKeeper.getState());
        }
    }
    

    运行该类后,控制台输出日志信息如下:

    2018-04-25 10:41:32,488 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:76)] - [WARN] 客户端开始连接zookeeper服务器...
    2018-04-25 10:41:32,505 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:77)] - [WARN] 连接状态:CONNECTING
    2018-04-25 10:41:32,515 [main-EventThread] [org.zero01.zk.demo.ZKConnect.process(ZKConnect.java:52)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    2018-04-25 10:41:34,507 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:81)] - [WARN] 连接状态:CONNECTED
    

    这样,我们就完成了一个与zookeeper服务端建立连接的过程。


    zk会话重连机制

    上一节我们简单演示了如何去连接zk服务端,本节则介绍一下,如何通过sessionid和session密码去恢复上一次的会话,也就是zk的会话重连机制。

    新建一个类,用做于演示zk会话重连机制的demo:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * @program: zookeeper-connection
     * @description: zookeeper 恢复之前的会话连接demo演示
     * @author: 01
     * @create: 2018-04-25 12:59
     **/
    public class ZKConnectSessionWatcher implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
    
        // Watch事件通知
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            // 实例化zookeeper客户端
            ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher());
    
            logger.warn("客户端开始连接zookeeper服务器...");
            logger.warn("连接状态:{}", zooKeeper.getState());
            Thread.sleep(2000);
            logger.warn("连接状态:{}", zooKeeper.getState());
    
            // 记录本次会话的sessionId
            long sessionId = zooKeeper.getSessionId();
            // 转换成16进制进行打印
            logger.warn("sid:{}", "0x" + Long.toHexString(sessionId));
            // 记录本次会话的session密码
            byte[] sessionPassword = zooKeeper.getSessionPasswd();
    
            Thread.sleep(200);
    
            // 开始会话重连
            logger.warn("开始会话重连...");
            // 加上sessionId和password参数去实例化zookeeper客户端
            ZooKeeper zkSession = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
            logger.warn("重新连接状态zkSession:{}", zkSession.getState());
            Thread.sleep(2000);
            logger.warn("重新连接状态zkSession:{}", zkSession.getState());
        }
    }
    

    运行该类,控制台输出日志结果如下:

    2018-04-25 13:48:00,931 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:35)] - [WARN] 客户端开始连接zookeeper服务器...
    2018-04-25 13:48:00,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:36)] - [WARN] 连接状态:CONNECTING
    2018-04-25 13:48:00,951 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 连接状态:CONNECTED
    2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:43)] - [WARN] sid:0x10000e81cfa0002
    2018-04-25 13:48:03,136 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:50)] - [WARN] 开始会话重连...
    2018-04-25 13:48:03,137 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:53)] - [WARN] 重新连接状态zkSession:CONNECTING
    2018-04-25 13:48:03,142 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    2018-04-25 13:48:05,140 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:55)] - [WARN] 重新连接状态zkSession:CONNECTED
    

    同步/异步创建zk节点

    以上我们介绍了如何去连接和重连zk服务端,既然知道如何连接zk服务端之后,我们来看一下如何,同步或异步去创建zk节点。

    先演示同步创建zk节点的方式,创建一个demo类如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.ACL;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * @program: zookeeper-connection
     * @description:  演示同步异步创建zk节点
     * @author: 01
     * @create: 2018-04-25 13:51
     **/
    public class ZkNodeOperator implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZkNodeOperator.class);
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private ZooKeeper zooKeeper;
    
        public ZooKeeper getZooKeeper() {
            return zooKeeper;
        }
    
        public void setZooKeeper(ZooKeeper zooKeeper) {
            this.zooKeeper = zooKeeper;
        }
    
        public ZkNodeOperator() {
        }
    
        public ZkNodeOperator(String connectStr) {
            try {
                // 在使用该构造器的时候,实例化zk客户端对象
                zooKeeper = new ZooKeeper(connectStr, timeout, new ZkNodeOperator());
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    if (zooKeeper != null) {
                        zooKeeper.close();
                    }
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        /**
         * @Title: ZKOperatorDemo.java
         * @Description: 创建zk节点
         */
        public void createZKNode(String path, byte[] data, List<ACL> acls) {
            String result = "";
            try {
                /**
                 * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
                 * 参数:
                 * path:节点创建的路径
                 * data:节点所存储的数据的byte[]
                 * acl:控制权限策略
                 *          Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
                 *          CREATOR_ALL_ACL --> auth:user:password:cdrwa
                 * createMode:节点类型, 是一个枚举
                 *          PERSISTENT:持久节点
                 *          PERSISTENT_SEQUENTIAL:持久顺序节点
                 *          EPHEMERAL:临时节点
                 *          EPHEMERAL_SEQUENTIAL:临时顺序节点
                 */
                // 同步创建zk节点,节点类型为临时节点
                result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
                System.out.println("创建节点:\t" + result + "\t成功...");
                Thread.sleep(2000);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            ZkNodeOperator zkServer = new ZkNodeOperator(zkServerIps);
    
            // 创建zk节点
            zkServer.createZKNode("/testNode", "testNode-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
        }
    }
    

    运行该类,到服务器上查看是否已创建成功。如下,我这里是创建成功的:

    [root@zk001 ~]# zkCli.sh
    [zk: localhost:2181(CONNECTED) 7] ls /
    [zookeeper, data, real-culster, testNode]
    [zk: localhost:2181(CONNECTED) 8] ls /
    [zookeeper, data, real-culster]  # 因为是临时节点,所以客户端断开之后就消失了
    [zk: localhost:2181(CONNECTED) 9] quit
    [root@zk001 ~]# 
    

    控制台输出的日志信息如下:

    2018-04-25 14:16:47,726 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    创建节点:   /testNode   成功...
    

    接下来我们演示一下异步创建zk节点的方式,因为异步创建有一个回调函数,所以我们得先创建一个类,实现StringCallback接口里面的回调方法:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.AsyncCallback.StringCallback;
    
    public class CreateCallBack implements StringCallback {
    
        // 回调函数
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("创建节点:" + path);
            System.out.println((String) ctx);
        }
    }
    

    修改 ZkNodeOperator 类中的 createZKNode 方法代码如下:

    ...
    public class ZkNodeOperator implements Watcher {
        ...
        /**
         * @Title: ZKOperatorDemo.java
         * @Description: 创建zk节点
         */
        public void createZKNode(String path, byte[] data, List<ACL> acls) {
            try {
                ...
                // 异步步创建zk节点,节点类型为持久节点
                String ctx = "{'create':'success'}";
                zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
    
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    

    运行该类,然后到服务器上查看是否已创建成功。如下,我这里是创建成功的:

    [zk: localhost:2181(CONNECTED) 9] ls /
    [zookeeper, data, real-culster, testNode]
    [zk: localhost:2181(CONNECTED) 10] get /testNode
    testNode-data
    cZxid = 0x700000014
    ctime = Wed Apr 25 22:17:26 CST 2018
    mZxid = 0x700000014
    mtime = Wed Apr 25 22:17:26 CST 2018
    pZxid = 0x700000014
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 13
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 11] 
    

    控制台输出的日志信息如下:

    2018-04-25 14:25:14,923 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    创建节点:/testNode
    {'create':'success'}
    

    同步/异步修改zk节点数据

    同样的,我们也可以通过Zookeeper提供的Java API去修改zk节点的数据,也是有同步和异步两种方式,先来演示同步的方式。创建一个新的类,代码如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * @program: zookeeper-connection
     * @description: 修改zk节点数据演示
     * @author: 01
     * @create: 2018-04-25 16:25
     **/
    public class ZKNodeAlterOperator implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZKNodeAlterOperator.class);
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private ZooKeeper zooKeeper;
    
        public ZooKeeper getZooKeeper() {
            return zooKeeper;
        }
    
        public void setZooKeeper(ZooKeeper zooKeeper) {
            this.zooKeeper = zooKeeper;
        }
        
        public ZKNodeAlterOperator() {
        }
        
        public ZKNodeAlterOperator(String connectStr) {
            try {
                // 在使用该构造器的时候,实例化zk客户端对象
                zooKeeper = new ZooKeeper(connectStr, timeout, new ZKNodeAlterOperator());
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    if (zooKeeper != null) {
                        zooKeeper.close();
                    }
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        public static void main(String[] args) throws KeeperException, InterruptedException {
            ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
    
            /**
             * 修改zk节点数据(同步)
             * 参数:
             * path:节点路径
             * data:新数据
             * version 数据版本
             */
            Stat status = zkServer.getZooKeeper().setData("/testNode", "this is new data".getBytes(), 0);
            // 通过Stat对象可以获取znode所有的状态属性,这里以version为例
            System.out.println("修改成功,当前数据版本为:" + status.getVersion());
        }
    }
    

    运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:

    [zk: localhost:2181(CONNECTED) 12] get /testNode
    this is new data
    cZxid = 0x700000014
    ctime = Wed Apr 25 22:17:26 CST 2018
    mZxid = 0x700000017
    mtime = Thu Apr 26 00:21:54 CST 2018
    pZxid = 0x700000014
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 16
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 13] 
    

    控制台输出的日志信息如下:

    2018-04-25 16:30:02,111 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:57)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    修改成功,当前数据版本为:1
    

    接下来演示一下异步修改zk节点数据的方式,和异步创建节点是几乎一样的。也是需要新建一个类来实现回调接口的方法,只不过接口不一样而已。如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.AsyncCallback.StatCallback;
    import org.apache.zookeeper.data.Stat;
    
    public class AlterCallBack implements StatCallback {
    
        // 回调函数
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            System.out.println("修改节点:" + path + "成功...");
            // 通过Stat对象可以获取znode所有的状态属性,这里以version为例
            System.out.println("当前数据版本为:" + stat.getVersion());
            System.out.println((String) ctx);
        }
    }
    

    然后修改 ZKNodeAlterOperator 类中的main方法代码如下:

    ...
    public class ZKNodeAlterOperator implements Watcher {
        ...
        public static void main(String[] args) throws KeeperException, InterruptedException {
            ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
    
            /**
             * 修改zk节点数据(异步)
             * 参数:
             * path:节点路径
             * data:新数据
             * version: 数据版本
             * sc:实现回调函数的对象
             * ctx:给回调函数的上下文
             */
            String ctx = "{'alter':'success'}";
            zkServer.getZooKeeper().setData("/testNode", "asynchronous-data".getBytes(), 0, new AlterCallBack(), ctx);
            
            Thread.sleep(2000);
        }
    }
    

    运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:

    [zk: localhost:2181(CONNECTED) 16] get /testNode
    asynchronous-data
    cZxid = 0x700000014
    ctime = Wed Apr 25 22:17:26 CST 2018
    mZxid = 0x70000001a
    mtime = Thu Apr 26 00:35:53 CST 2018
    pZxid = 0x700000014
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 17
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 17] 
    

    控制台输出的日志信息如下:

    2018-04-25 16:44:03,472 [main-EventThread] [org.zero01.zk.demo.ZKNodeAlterOperator.process(ZKNodeAlterOperator.java:58)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
    修改节点:/testNode成功...
    当前数据版本为:2
    {'alter':'success'}
    

    同步/异步删除zk节点

    同样的,删除节点也有同步和异步两种方式,在删除节点操作上,使用异步会更人性化一些,因为有回调通知,同步的方式,除了设置了watch事件,不然是没有通知的。我们先来看一下同步方式的删除节点,代码如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ZKNodeDeleteOperator implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZKNodeDeleteOperator.class);
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private static ZooKeeper zooKeeper;
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        public static void main(String[] args) throws Exception {
            zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
            // 创建节点
            zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            Thread.sleep(1000);
            /**
             * 删除节点(同步)
             * 参数:
             * path:需要删除的节点路径
             * version:数据版本
             */
            zooKeeper.delete("/testDeleteNode", 0);
    
            zooKeeper.close();
        }
    }
    

    由于同步的删除方法不会有返回值,所以我们无法在控制台输出内容。

    然后再来看一下异步方式的删除节点,首先需要新建一个类实现回调接口的方法:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.AsyncCallback.VoidCallback;
    
    public class DeleteCallBack implements VoidCallback {
    
        // 回调函数
        public void processResult(int rc, String path, Object ctx) {
            System.out.println("删除节点:" + path + " 成功...");
            System.out.println((String) ctx);
        }
    }
    

    然后修改一下 ZKNodeDeleteOperator 类的main方法:

    public class ZKNodeDeleteOperator implements Watcher {
        public static void main(String[] args) throws Exception {
            zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
            // 创建节点
            zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            Thread.sleep(1000);
            /**
             * 删除节点(异步)
             * 参数:
             * path:需要删除的节点路径
             * version:数据版本
             * sc:实现回调函数的对象
             * ctx:给回调函数的上下文
             */
            String ctx = "{'delete':'success'}";
            zooKeeper.delete("/testDeleteNode", 0, new DeleteCallBack(), ctx);
            Thread.sleep(2000);
            zooKeeper.close();
        }
    }
    

    运行该类,控制台输出结果如下:

    删除节点:/testDeleteNode 成功...
    {'delete':'success'}
    

    获取zk节点数据

    以上小节介绍完了增删改,现在就剩下查了。同样的查询也有同步和异步两种方式,异步的方式在之前的增删改例子中已经都介绍过了,在查询里使用异步也是和增删改同样的方式,所以就不再演示查询的异步了。zk中有三种数据可以查询:查询zk节点数据、查询zk子节点列表、查询某个zk节点是否存在。本节先介绍如何查询zk节点数据。

    现在zookeeper服务器上,有一个/testNode节点。节点数据内容如下:

    [zk: localhost:2181(CONNECTED) 3] get /testNode
    asynchronous-data
    ...
    [zk: localhost:2181(CONNECTED) 4] 
    

    然后我们来编写一个 ZKGetNodeData 类,调用zookeeper的API去获取zk节点数据。代码示例:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @program: zookeeper-connection
     * @description: 获取zk节点数据demo
     * @author: 01
     * @create: 2018-04-26 18:05
     **/
    public class ZKGetNodeData implements Watcher {
    
        private static final Logger logger = LoggerFactory.getLogger(ZKGetNodeData.class);
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private static ZooKeeper zooKeeper;
        private static Stat stat = new Stat();
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
            logger.warn("接收到watch通知:{}", watchedEvent);
        }
    
        public static void main(String[] args) throws Exception {
            zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
    
            /**
             * 参数:
             * path:节点路径
             * watch:true或者false,注册一个watch事件
             * stat:状态,我们可以通过这个对象获取节点的状态信息
             */
            byte[] resByte = zooKeeper.getData("/testNode", true, stat);
            String result = new String(resByte);
            System.out.println("/testNode 节点的数据: " + result);
    
            zooKeeper.close();
        }
    }
    

    控制台输出结果如下:

    /testNode 节点的值: asynchronous-data
    

    通过实现 Watcher 接口的通知方法,再结合这个获取节点数据的API,我们就可以在数据发生改变的时候获取最新的数据。如下示例,在 ZKGetNodeData 类中,增加代码如下:

    ...
    public class ZKGetNodeData implements Watcher {
        ...
        // 计数器
        private static CountDownLatch countDownLatch = new CountDownLatch(1);
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
            try {
                if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                    ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
                    byte[] resByte = zooKeeper.getData("/testNode", false, stat);
                    String result = new String(resByte);
                    System.out.println("/testNode 节点的数据发生了变化");
                    System.out.println("新的数据为: " + result);
                    System.out.println("新的数据版本号为:" + stat.getVersion());
    
                    // 通知完之后,计数器减一
                    countDownLatch.countDown();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
            ...
            // 等待线程执行
            countDownLatch.await();
        }
    }
    

    这时候由于我们在main里调用了await()方法,所以主线程会阻塞。然后我们到zookeeper服务器上,对该节点的数据进行操作,如下:

    [zk: localhost:2181(CONNECTED) 11] get /testNode
    asynchronous-data
    cZxid = 0x700000014
    ctime = Wed Apr 25 22:17:26 CST 2018
    mZxid = 0x800000011
    mtime = Fri Apr 27 03:04:09 CST 2018
    pZxid = 0x700000014
    cversion = 0
    dataVersion = 6
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 17
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 12] set /testNode new-data 6       
    cZxid = 0x700000014
    ctime = Wed Apr 25 22:17:26 CST 2018
    mZxid = 0x800000013
    mtime = Fri Apr 27 03:04:35 CST 2018
    pZxid = 0x700000014
    cversion = 0
    dataVersion = 7
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 8
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 13]
    

    当我们修改了数据之后,控制台就会输出如下内容,主线程就会解除阻塞结束执行:

    /testNode 节点的数据: asynchronous-data
    /testNode 节点的数据发生了变化
    新的数据为: new-data
    新的数据版本号为:7
    

    获取zk子节点列表

    本节介绍一下如何获取zk子节点列表,同样的也是有同步和异步两种方式,这里介绍的是同步的。testNode节点下有三个节点,如下:

    [zk: localhost:2181(CONNECTED) 20] ls /testNode
    [ThreeNode, TwoNode, OneNode]
    [zk: localhost:2181(CONNECTED) 21] 
    

    我们来写一个demo获取这个节点下的子节点列表。代码如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.List;
    
    /**
     * @program: zookeeper-connection
     * @description:  zookeeper 获取子节点数据的demo演示
     * @author: 01
     * @create: 2018-04-26 21:13
     **/
    public class ZKGetChildrenList implements Watcher{
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private static ZooKeeper zooKeeper;
    
        // Watch事件通知方法
        public void process(WatchedEvent watchedEvent) {
        }
    
        public static void main(String[] args) throws Exception {
            zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetChildrenList());
    
            /**
             * 参数:
             * path:父节点路径
             * watch:true或者false,注册一个watch事件
             */
            List<String> strChildList = zooKeeper.getChildren("/testNode", false);
            for (String s : strChildList) {
                System.out.println(s);
            }
        }
    }
    

    控制台就会输出内容如下:

    ThreeNode
    TwoNode
    OneNode
    

    判断zk节点是否存在

    最后介绍如何判断一个zk节点是否存在,同样的也是有同步和异步两种方式,这里介绍的是同步的。我们来写一个demo判断某个zk节点是否存在。代码如下:

    package org.zero01.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * @program: zookeeper-connection
     * @description: zookeeper 判断节点是否存在demo
     * @author: 01
     * @create: 2018-04-26 22:06
     **/
    public class ZKNodeExist implements Watcher {
    
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
        // 超时时间
        private static final Integer timeout = 5000;
        private static ZooKeeper zooKeeper;
    
        public void process(WatchedEvent watchedEvent) {
        }
    
        public static void main(String[] args) throws Exception {
            zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeExist());
    
            /**
             * 参数:
             * path:节点路径
             * watch:true或者false,注册一个watch事件
             */
            Stat stat = zooKeeper.exists("/testNode", true);
            if (stat != null) {
                System.out.println("testNode 节点存在...");
                System.out.println("该节点的数据版本为:" + stat.getVersion());
            } else {
                System.out.println("该节点不存在...");
            }
        }
    }
    

    运行该类,控制台输出如下:

    testNode 节点存在...
    该节点的数据版本为:7
    

    将testNode换成一个不存在的节点,

    该节点不存在...
    

    相关文章

      网友评论

          本文标题:使用ZooKeeper提供的原生Java API操作ZooKee

          本文链接:https://www.haomeiwen.com/subject/lropictx.html