美文网首页
使用Apache Curator操作ZooKeeper

使用Apache Curator操作ZooKeeper

作者: 端碗吹水 | 来源:发表于2019-11-17 17:22 被阅读0次

    curator简介与客户端之间的异同点

    常用的zookeeper java客户端:

    • zookeeper原生Java API
    • zkclient
    • Apache curator

    ZooKeeper原生Java API的不足之处:

    • 在连接zk超时的时候,不支持自动重连,需要手动操作
    • Watch注册一次就会失效,需要反复注册
    • 不支持递归创建节点

    Apache curator:

    • Apache 的开源项目
    • 解决Watch注册一次就会失效的问题
    • 提供的 API 更加简单易用
    • 提供更多解决方案并且实现简单,例如:分布式锁
    • 提供常用的ZooKeeper工具类
    • 编程风格更舒服,

    搭建maven工程,建立curator与zkserver的连接

    创建一个普通的maven工程,在pom.xml文件中,配置如下依赖:

        <dependencies>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.11</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.5</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.7.4</version>
            </dependency>
        </dependencies>
    

    配置完依赖后,我们就可以来写一个简单的demo测试与zookeeper服务端的连接。代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    /**
     * @program: zookeeper-connection
     * @description: 建立curator与zkserver的连接演示demo
     * @author: 01
     * @create: 2018-04-28 09:44
     **/
    public class CuratorConnect {
    
        // Curator客户端
        public CuratorFramework client = null;
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    
        public CuratorConnect(){
            /**
             * 同步创建zk示例,原生api是异步的
             * 这一步是设置重连策略
             *
             * ExponentialBackoffRetry构造器参数:
             *  curator链接zookeeper的策略:ExponentialBackoffRetry
             *  baseSleepTimeMs:初始sleep的时间
             *  maxRetries:最大重试次数
             *  maxSleepMs:最大重试时间
             */
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
    
            // 实例化Curator客户端,Curator的编程风格可以让我们使用方法链的形式完成客户端的实例化
            client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                    .connectString(zkServerIps)  // 放入zookeeper服务器ip
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
                    .build();  // 建立连接通道
    
            // 启动Curator客户端
            client.start();
        }
    
        // 关闭zk客户端连接
        private void closeZKClient() {
            if (client != null) {
                this.client.close();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    控制台输出信息如下:

    当前客户端的状态:连接中...
    当前客户端的状态:已关闭...
    

    curator连接zookeeper服务器时有自动重连机制,而curator的重连策略有五种。第一种就是我们以上demo中使用到的:

    /**
     * (推荐)
     * 同步创建zk示例,原生api是异步的
     * 这一步是设置重连策略
     * 
     * 构造器参数:
     *  curator链接zookeeper的策略:ExponentialBackoffRetry
     *  baseSleepTimeMs:初始sleep的时间
     *  maxRetries:最大重试次数
     *  maxSleepMs:最大重试时间
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
    

    第二种,可设定重连n次:

    /**
     * (推荐)
     * curator链接zookeeper的策略:RetryNTimes
     * 
     * 构造器参数:
     * n:重试的次数
     * sleepMsBetweenRetries:每次重试间隔的时间
     */
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    

    第三种,只会重连一次:

    /**
     * (不推荐)
     * curator链接zookeeper的策略:RetryOneTime
     * 
     * 构造器参数:
     * sleepMsBetweenRetry:每次重试间隔的时间
     * 这个策略只会重试一次
     */
    RetryPolicy retryPolicy2 = new RetryOneTime(3000);
    

    第四种,永远重连:

    /**
     * 永远重试,不推荐使用
     */
    RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
    

    第五种,可设定最大重试时间:

    /**
     * curator链接zookeeper的策略:RetryUntilElapsed
     * 
     * 构造器参数:
     * maxElapsedTimeMs:最大重试时间
     * sleepMsBetweenRetries:每次重试间隔
     * 重试时间超过maxElapsedTimeMs后,就不再重试
     */
    RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
    

    zk命名空间以及创建节点

    zookeeper的命名空间就类似于我们平时使用Eclipse等开发工具的工作空间一样,我们该连接中所有的操作都是基于这个命名空间的。curator提供了设置命名空间的方法,这样我们任何的连接都可以去设置一个命名空间。设置了命名空间并成功连接后,zookeeper的根节点会多出一个以命名空间名称所命名的节点。然后我们在该连接的增删查改等操作都会在这个节点中进行。例如,现在zookeeper服务器上只有以下几个节点:

    [zk: localhost:2181(CONNECTED) 0] ls /
    [zookeeper, data, real-culster, testDigestNode]
    [zk: localhost:2181(CONNECTED) 1]
    

    然后我们来将之前的demo修改一下,加上设置命名空间的代码以及创建节点的代码,以此来做一个简单的演示,修改之前的demo代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    
    public class CuratorConnect {
    
        // Curator客户端
        public CuratorFramework client = null;
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    
        public CuratorConnect() {
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
    
            // 实例化Curator客户端
            client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                    .connectString(zkServerIps)  // 放入zookeeper服务器ip
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
                    .namespace("workspace").build();  // 设置命名空间以及开始建立连接
    
            // 启动Curator客户端
            client.start();
        }
    
        // 关闭zk客户端连接
        private void closeZKClient() {
            if (client != null) {
                this.client.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 创建节点
            String nodePath = "/super/testNode";  // 节点路径
            byte[] data = "this is a test data".getBytes();  // 节点数据
            String result = curatorConnect.client.create().creatingParentsIfNeeded()  // 创建父节点,也就是会递归创建
                    .withMode(CreateMode.PERSISTENT)  // 节点类型
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 节点的acl权限
                    .forPath(nodePath, data);
    
            System.out.println(result + "节点,创建成功...");
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,控制台输出信息如下:

    当前客户端的状态:连接中...
    /super/testNode节点,创建成功...
    当前客户端的状态:已关闭...
    

    到服务器上,查看是否多了一个 workspace 节点,并且我们创建的节点都在这个节点下:

    [zk: localhost:2181(CONNECTED) 12] ls /
    [workspace, zookeeper, data, real-culster, testDigestNode]
    [zk: localhost:2181(CONNECTED) 13] ls /workspace
    [super]
    [zk: localhost:2181(CONNECTED) 14] ls /workspace/super
    [testNode]
    [zk: localhost:2181(CONNECTED) 15] get /workspace/super/testNode
    this is a test data
    cZxid = 0xb0000000f
    ctime = Sat Apr 28 18:56:36 CST 2018
    mZxid = 0xb0000000f
    mtime = Sat Apr 28 18:56:36 CST 2018
    pZxid = 0xb0000000f
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 19
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 18] getAcl /workspace/super/testNode
    'world,'anyone
    : cdrwa
    [zk: localhost:2181(CONNECTED) 19] 
    

    修改节点以及删除节点

    上一节中,我们介绍了如何创建节点,本节我们来简单演示一下如何修改节点的数据以及删除节点。修改 CuratorConnect 类代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    
    public class CuratorConnect {
    
        // Curator客户端
        public CuratorFramework client = null;
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    
        public CuratorConnect() {
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
    
            // 实例化Curator客户端
            client = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                    .connectString(zkServerIps)  // 放入zookeeper服务器ip
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
                    .namespace("workspace").build();  // 设置命名空间以及开始建立连接
    
            // 启动Curator客户端
            client.start();
        }
    
        // 关闭zk客户端连接
        private void closeZKClient() {
            if (client != null) {
                this.client.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // 更新节点数据
            byte[] newData = "this is a new data".getBytes();
            Stat resultStat = curatorConnect.client.setData().withVersion(0)  // 指定数据版本
                    .forPath(nodePath, newData);  // 需要修改的节点路径以及新数据
    
            System.out.println("更新节点数据成功,新的数据版本为:" + resultStat.getVersion());
    
            // 删除节点
            curatorConnect.client.delete()
                    .guaranteed()  // 如果删除失败,那么在后端还是会继续删除,直到成功
                    .deletingChildrenIfNeeded()  // 子节点也一并删除,也就是会递归删除
                    .withVersion(resultStat.getVersion())
                    .forPath(nodePath);
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,控制台输出信息如下:

    当前客户端的状态:连接中...
    更新节点数据成功,新的数据版本为:1
    当前客户端的状态:已关闭...
    

    此时到zookeeper服务器上,可以看到,节点已经被成功删除了:

    [zk: localhost:2181(CONNECTED) 19] ls /workspace/super
    []
    [zk: localhost:2181(CONNECTED) 20] 
    

    查询节点相关信息

    1.获取某个节点的数据,现有一个节点的数据如下:

    [zk: localhost:2181(CONNECTED) 22] get /workspace/super/testNode    
    test-data
    cZxid = 0xb00000015
    ctime = Sat Apr 28 20:59:57 CST 2018
    mZxid = 0xb00000015
    mtime = Sat Apr 28 20:59:57 CST 2018
    pZxid = 0xb00000015
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 9
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 23] 
    

    修改 CuratorConnect 类中的main方法代码如下,一些重复的代码就忽略了:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // 读取节点数据
            Stat stat = new Stat();
            byte[] nodeData = curatorConnect.client.getData().storingStatIn(stat).forPath(nodePath);
            System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
            System.out.println("该节点的数据版本号为:" + stat.getVersion());
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,控制台输出内容如下:

    当前客户端的状态:连接中...
    节点 /super/testNode 的数据为:test-data
    该节点的数据版本号为:0
    当前客户端的状态:已关闭...
    

    2.获取某个节点下的子节点列表,现有一个节点的子节点列表如下:

    [zk: localhost:2181(CONNECTED) 33] ls /workspace/super/testNode
    [threeNode, twoNode, oneNode]
    [zk: localhost:2181(CONNECTED) 34]
    

    修改 CuratorConnect 类中的main方法代码如下,一些重复的代码就忽略了:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // 获取子节点列表
            List<String> childNodes = curatorConnect.client.getChildren().forPath(nodePath);
            System.out.println(nodePath + " 节点下的子节点列表:");
            for (String childNode : childNodes) {
                System.out.println(childNode);
            }
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,控制台输出内容如下:

    当前客户端的状态:连接中...
    /super/testNode 节点下的子节点列表:
    threeNode
    twoNode
    oneNode
    当前客户端的状态:已关闭...
    

    3.查询某个节点是否存在,修改 CuratorConnect 类中的main方法代码如下,一些重复的代码就忽略了:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // 查询某个节点是否存在,存在就会返回该节点的状态信息,如果不存在的话则返回空
            Stat statExist = curatorConnect.client.checkExists().forPath(nodePath);
            if (statExist == null) {
                System.out.println(nodePath + " 节点不存在");
            } else {
                System.out.println(nodePath + " 节点存在");
            }
    
            Thread.sleep(1000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,控制台输出内容如下:

    当前客户端的状态:连接中...
    /super/testNode 节点存在
    当前客户端的状态:已关闭...
    

    如果查询一个不存在的节点,就会返回null,我们可以测试一下将nodePath改成一个不存在的节点。然后运行该类,控制台输出内容如下:

    当前客户端的状态:连接中...
    /super/asdasdasd 节点不存在
    当前客户端的状态:已关闭...
    

    至此,使用curator对zookeeper节点的增删查改操作就演示完毕了。


    curator之usingWatcher

    curator在注册watch事件上,提供了一个usingWatcher方法,使用这个方法注册的watch事件和默认watch事件一样,监听只会触发一次,监听完毕后就会销毁,也就是一次性的。而这个方法有两种参数可选,一个是zk原生API的Watcher接口的实现类,另一个是Curator提供的CuratorWatcher接口的实现类,不过在usingWatcher方法上使用哪一个效果都是一样的,都是一次性的。

    新建一个 MyWatcher 实现类,实现 Watcher 接口。代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    
    /**
     * @program: zookeeper-connection
     * @description:  zk原生API的Watcher接口实现
     * @author: 01
     * @create: 2018-04-28 13:41
     **/
    public class MyWatcher implements Watcher {
    
        // Watcher事件通知方法
        public void process(WatchedEvent watchedEvent) {
            System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath());
        }
    }
    

    新建一个 MyCuratorWatcher 实现类,实现 CuratorWatcher 接口。代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.curator.framework.api.CuratorWatcher;
    import org.apache.zookeeper.WatchedEvent;
    
    /**
     * @program: zookeeper-connection
     * @description: Curator提供的CuratorWatcher接口实现
     * @author: 01
     * @create: 2018-04-28 13:40
     **/
    public class MyCuratorWatcher implements CuratorWatcher {
    
        // Watcher事件通知方法
        public void process(WatchedEvent watchedEvent) throws Exception {
            System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath());
        }
    }
    

    修改 CuratorConnect 类的main方法代码如下,因为在usingWatcher方法上使用一个接口的实现类效果都是一样的,所以这里就只演示其中一种。代码如下:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // 添加 watcher 事件,当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
            curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
            // curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
    
            Thread.sleep(100000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,然后到zookeeper服务器上修改/super/testNode节点的数据:

    [zk: localhost:2181(CONNECTED) 35] set /workspace/super/testNode new-data
    cZxid = 0xb00000015
    ctime = Sat Apr 28 20:59:57 CST 2018
    mZxid = 0xb0000002b
    mtime = Sat Apr 28 21:40:58 CST 2018
    pZxid = 0xb0000001c
    cversion = 3
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 8
    numChildren = 3
    [zk: localhost:2181(CONNECTED) 36] 
    

    修改完成后,此时控制台输出内容如下,因为workspace是命名空间节点,所以不会被打印出来:

    触发watcher,节点路径为:/super/testNode
    

    curator之nodeCache一次注册N次监听

    想要实现watch一次注册n次监听的话,我们需要使用到curator里的一个NodeCache对象。这个对象可以用来缓存节点数据,并且可以给节点添加nodeChange事件,当节点的数据发生变化就会触发这个事件。

    我们依旧是使用之前的demo进行演示,修改 CuratorConnect 类中的main方法代码如下:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 节点路径
            String nodePath = "/super/testNode";
    
            // NodeCache: 缓存节点,并且可以监听数据节点的变更,会触发事件
            final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);
    
            // 参数 buildInitial : 初始化的时候获取node的值并且缓存
            nodeCache.start(true);
    
            // 获取缓存里的节点初始化数据
            if (nodeCache.getCurrentData() != null) {
                System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
            } else {
                System.out.println("节点初始化数据为空...");
            }
    
            // 为缓存的节点添加watcher,或者说添加监听器
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                // 节点数据change事件的通知方法
                public void nodeChanged() throws Exception {
                    // 防止节点被删除时发生错误
                    if (nodeCache.getCurrentData() == null) {
                        System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
                        return;
                    }
                    // 获取节点最新的数据
                    String data = new String(nodeCache.getCurrentData().getData());
                    System.out.println(nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
                }
            });
    
            Thread.sleep(200000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类后,我们到zookeeper服务器上,对/super/testNode节点进行如下操作:

    [zk: localhost:2181(CONNECTED) 2] set /workspace/super/testNode change-data     
    cZxid = 0xb00000015
    ctime = Sat Apr 28 20:59:57 CST 2018
    mZxid = 0xb00000037
    mtime = Sat Apr 28 23:49:42 CST 2018
    pZxid = 0xb0000001c
    cversion = 3
    dataVersion = 6
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 11
    numChildren = 3      
    [zk: localhost:2181(CONNECTED) 3] set /workspace/super/testNode change-agin-data
    cZxid = 0xb00000015
    ctime = Sat Apr 28 20:59:57 CST 2018
    mZxid = 0xb00000038
    mtime = Sat Apr 28 23:50:01 CST 2018
    pZxid = 0xb0000001c
    cversion = 3
    dataVersion = 7
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 16
    numChildren = 3
    [zk: localhost:2181(CONNECTED) 8] delete /workspace/super/testNode
    [zk: localhost:2181(CONNECTED) 9] create /workspace/super/testNode test-data
    Created /workspace/super/testNode
    [zk: localhost:2181(CONNECTED) 10]
    

    此时控制台输出内容如下:

    当前客户端的状态:连接中...
    节点初始化数据为:new-data
    /super/testNode 节点的数据发生变化,最新的数据为:change-data
    /super/testNode 节点的数据发生变化,最新的数据为:change-agin-data
    获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除
    /super/testNode 节点的数据发生变化,最新的数据为:test-data
    当前客户端的状态:已关闭...
    

    从控制台输出的内容可以看到,只要数据发生改变了都会触发这个事件,并且是可以重复触发的,而不是一次性的。


    curator之PathChildrenCache子节点监听

    使用NodeCache虽然能实现一次注册n次监听,但是却只能监听一个nodeChanged事件,也就是说创建、删除以及子节点的事件都无法监听。如果我们要监听某一个节点的子节点的事件,或者监听某一个特定节点的增删改事件都需要借助PathChildrenCache来实现。从名称上可以看到,PathChildrenCache也是用缓存实现的,并且也是一次注册n次监听。当我们传递一个节点路径时是监听该节点下的子节点事件,如果我们要限制监听某一个节点,只需要加上判断条件即可。

    我们这里演示简单子节点事件的监听,修改 CuratorConnect 类的main方法代码如下:

    ...
    public class CuratorConnect {
        ...
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 父节点路径
            String nodePath = "/super/testNode";
    
            // 为子节点添加watcher
            // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
            final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, nodePath, true);
    
            /**
             * StartMode: 初始化方式
             * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
             * NORMAL:异步初始化
             * BUILD_INITIAL_CACHE:同步初始化
             */
            childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    
            // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
            List<ChildData> childDataList = childrenCache.getCurrentData();
            System.out.println("当前节点的子节点详细数据列表:");
            for (ChildData childData : childDataList) {
                System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData()));
            }
    
            // 添加事件监听器
            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    // 通过判断event type的方式来实现不同事件的触发
                    if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                        System.out.println("\n--------------\n");
                        System.out.println("子节点初始化成功");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                        System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                        System.out.println("\n--------------\n");
                        System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                        System.out.println("\n--------------\n");
                        System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                        System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                    }
                }
            });
    
            Thread.sleep(200000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    运行该类,然后到zookeeper服务器上执行如下操作:

    [zk: localhost:2181(CONNECTED) 0] create /workspace/super/testNode/fourNode four-node-data
    Created /workspace/super/testNode/fourNode
    [zk: localhost:2181(CONNECTED) 1] set /workspace/super/testNode/oneNode change-node-data
    cZxid = 0xc00000002
    ctime = Sun Apr 29 18:23:57 CST 2018
    mZxid = 0xc00000023
    mtime = Sun Apr 29 20:16:22 CST 2018
    pZxid = 0xc00000002
    cversion = 0
    dataVersion = 3
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 16
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 2] delete /workspace/super/testNode/fourNode
    [zk: localhost:2181(CONNECTED) 5] create /workspace/super/testNode/fiveNode five-node-data
    Created /workspace/super/testNode/fiveNode
    [zk: localhost:2181(CONNECTED) 6] 
    

    此时,控制台输出内容如下:

    当前客户端的状态:连接中...
    当前节点的子节点详细数据列表:
        * 子节点路径:/super/testNode/oneNode,该节点的数据为:one-node-data
        * 子节点路径:/super/testNode/threeNode,该节点的数据为:three-node-data
        * 子节点路径:/super/testNode/twoNode,该节点的数据为:two-node-data
    
    --------------
    
    子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data
    
    --------------
    
    子节点:/super/testNode/oneNode 数据更新成功,子节点:/super/testNode/oneNode 新的数据为:change-node-data
    
    --------------
    
    子节点:/super/testNode/fourNode 删除成功
    
    --------------
    
    子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data
    当前客户端的状态:已关闭...
    

    以上的演示例子中为了获取子节点列表,所以我们的代码使用的是同步初始化模式。如果使用异步初始化是获取不到子节点列表的,例如修改childrenCache.start代码如下:

    childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    

    这种模式会在初始化时触发子节点初始化以及添加子节点这两个事件,将初始化模式修改成POST_INITIALIZED_EVENT后,运行该类,此时控制台输出信息如下:

    当前客户端的状态:连接中...
    当前节点的子节点详细数据列表:
    
    --------------
    
    子节点:/super/testNode/threeNode 添加成功,该子节点的数据为:three-node-data
    
    --------------
    
    子节点:/super/testNode/twoNode 添加成功,该子节点的数据为:two-node-data
    
    --------------
    
    子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data
    
    --------------
    
    子节点:/super/testNode/oneNode 添加成功,该子节点的数据为:change-node-data
    
    --------------
    
    子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data
    
    --------------
    
    子节点初始化成功
    当前客户端的状态:已关闭...
    

    从控制台输出信息中可以看到,在这种模式下,子节点列表并没有被获取出来。除此之外,会触发添加子节点事件以及子节点初始化事件。因为缓存初始化时是把子节点添加到缓存里,所以会触发添加子节点事件,而添加完成之后,就会触发子节点初始化完成事件。

    我们再来看看另一种异步初始化的模式:NORMAL模式,在这种模式下,同样的无法获取子节点列表,并且也会触发添加子节点事件,但是不会触发子节点初始化完成事件。修改childrenCache.start代码如下:

    childrenCache.start(PathChildrenCache.StartMode.NORMAL);
    

    将初始化模式修改成NORMAL后,运行该类,此时控制台输出信息如下:

    当前客户端的状态:连接中...
    当前节点的子节点详细数据列表:
    
    --------------
    
    子节点:/super/testNode/threeNode 添加成功,该子节点的数据为:three-node-data
    
    --------------
    
    子节点:/super/testNode/twoNode 添加成功,该子节点的数据为:two-node-data
    
    --------------
    
    子节点:/super/testNode/fiveNode 添加成功,该子节点的数据为:five-node-data
    
    --------------
    
    子节点:/super/testNode/oneNode 添加成功,该子节点的数据为:change-node-data
    
    --------------
    
    子节点:/super/testNode/fourNode 添加成功,该子节点的数据为:four-node-data
    当前客户端的状态:已关闭...
    

    从控制台输出信息中可以看到,在这种模式下,子节点列表并没有被获取出来。除此之外,还会触发添加子节点事件。通常使用异步初始化的情况下,都是使用POST_INITIALIZED_EVENT模式,NORMAL较为少用。

    如果我们想要监听某一个特定的节点,例如我们要监听/super/testNode这个节点,那么以上面的代码作为例子,就需要把nodePath改为/super。然后在每一个判断条件中,再加上一个子判断条件,将节点限定为/super/testNode才会触发,这样就能实现监听某一个节点的增删改事件了。如下示例:

    ...
    public class CuratorConnect {
        ...
        
        private static final String PARENT_NODE_PATH = "/super";  // 父节点
        private static final String NODE_PATH = "/super/testNode";  // 需要被特定监听的节点
        
        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorConnect curatorConnect = new CuratorConnect();
            // 获取当前客户端的状态
            boolean isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
    
            // 为子节点添加watcher
            // PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
            final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, PARENT_NODE_PATH, true);
    
            childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    
            // 添加事件监听器
            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    // 通过判断event type的方式来实现不同事件的触发
                    if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                        System.out.println("\n--------------\n");
                        System.out.println("子节点初始化成功");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                        if (event.getData().getPath().trim().equals(NODE_PATH)) {
                            System.out.println("\n--------------\n");
                            System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                            System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                        }
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                        if (event.getData().getPath().trim().equals(NODE_PATH)) {
                            System.out.println("\n--------------\n");
                            System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                        }
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                        if (event.getData().getPath().trim().equals(NODE_PATH)) {
                            System.out.println("\n--------------\n");
                            System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                            System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                        }
                    }
                }
            });
    
            Thread.sleep(200000);
    
            // 关闭客户端
            curatorConnect.closeZKClient();
    
            // 获取当前客户端的状态
            isZkCuratorStarted = curatorConnect.client.isStarted();
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
        }
    }
    

    这是最简单粗暴的方式了,当然在实际开发中,肯定会写得好一些,这个演示只是为了说明可以借助PathChildrenCache来实现某个特点节点的增删改事件监听。


    zk-watcher应用实例之模拟统一更新N台节点的配置文件

    zookeeper有一个比较常见的应用场景就是统一管理、更新分布式集群环境中每个节点的配置文件,我们可以在代码中监听集群中的节点,当节点数据发生改变时就同步到其他节点上。如下图:


    image.png

    因为我们使用的json作为节点存储的数据格式,所以需要准备一个工具类来做json与pojo对象的一个转换,也就是所谓的反序列化。创建一个 JsonUtils 类,代码如下:

    package org.zero01.zk.util;
    
    import java.util.List;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.JavaType;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    /**
     * @Title: JsonUtils.java
     * @Package org.zero01.zk.util
     * @Description: JSON/对象转换类
     */
    public class JsonUtils {
    
        // 定义jackson对象
        private static final ObjectMapper MAPPER = new ObjectMapper();
    
        /**
         * 将对象转换成json字符串。
         * <p>Title: pojoToJson</p>
         * <p>Description: </p>
         * @param data
         * @return
         */
        public static String objectToJson(Object data) {
            try {
                String string = MAPPER.writeValueAsString(data);
                return string;
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 将json结果集转化为对象
         *
         * @param jsonData json数据
         * @param beanType 对象中的object类型
         * @return
         */
        public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
            try {
                T t = MAPPER.readValue(jsonData, beanType);
                return t;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 将json数据转换成pojo对象list
         * <p>Title: jsonToList</p>
         * <p>Description: </p>
         * @param jsonData
         * @param beanType
         * @return
         */
        public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
            JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
            try {
                List<T> list = MAPPER.readValue(jsonData, javaType);
                return list;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    

    创建一个pojo类,封装json格式的数据:

    package org.zero01.zk.util;
    
    public class RedisConfig {
    
        private String type;    // add 新增配置 update 更新配置 delete 删除配置
        private String url;        // 如果是add或update,则提供下载地址
        private String remark;    // 备注
        ... gtter stter 略 ...
    }
    

    然后创建客户端类,客户端类就是用来监听集群中的节点的。由于是模拟,所以这里的部分代码是伪代码。客户端类我们这里创建了三个,因为集群中有三个节点,由于代码基本上是一样的,所以这里只贴出客户端_1的代码。如下:

    package org.zero01.zk.checkconfig;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
    import org.apache.curator.retry.RetryNTimes;
    
    import org.zero01.zk.util.JsonUtils;
    import org.zero01.zk.util.RedisConfig;
    
    public class Client_1 {
    
        public CuratorFramework client = null;
        public static final String zkServerIp = "192.168.190.128:2181";
    
        // 初始化重连策略以及客户端对象并启动
        public Client_1() {
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerIp)
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                    .namespace("workspace").build();
            client.start();
        }
    
        // 关闭客户端
        public void closeZKClient() {
            if (client != null) {
                this.client.close();
            }
        }
    
        //  public final static String CONFIG_NODE = "/super/testNode/redis-config";
        public final static String CONFIG_NODE_PATH = "/super/testNode";
        public final static String SUB_PATH = "/redis-config";
        public static CountDownLatch countDown = new CountDownLatch(1);  // 计数器
    
        public static void main(String[] args) throws Exception {
            Client_1 cto = new Client_1();
            System.out.println("client1 启动成功...");
    
            // 开启子节点缓存
            final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
            childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
    
            // 添加子节点监听事件
            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    // 监听节点的数据更新事件
                    if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                        String configNodePath = event.getData().getPath();
                        if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                            System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
    
                            // 读取节点数据
                            String jsonConfig = new String(event.getData().getData());
                            System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
    
                            // 从json转换配置
                            RedisConfig redisConfig = null;
                            if (StringUtils.isNotBlank(jsonConfig)) {
                                redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                            }
    
                            // 配置不为空则进行相应操作
                            if (redisConfig != null) {
                                String type = redisConfig.getType();
                                String url = redisConfig.getUrl();
                                String remark = redisConfig.getRemark();
                                // 判断事件
                                if (type.equals("add")) {
                                    System.out.println("\n-------------------\n");
                                    System.out.println("监听到新增的配置,准备下载...");
                                    // ... 连接ftp服务器,根据url找到相应的配置
                                    Thread.sleep(500);
                                    System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                                    // ... 下载配置到你指定的目录
                                    Thread.sleep(1000);
                                    System.out.println("下载成功,已经添加到项目中");
                                    // ... 拷贝文件到项目目录
                                } else if (type.equals("update")) {
                                    System.out.println("\n-------------------\n");
                                    System.out.println("监听到更新的配置,准备下载...");
                                    // ... 连接ftp服务器,根据url找到相应的配置
                                    Thread.sleep(500);
                                    System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                                    // ... 下载配置到你指定的目录
                                    Thread.sleep(1000);
                                    System.out.println("下载成功...");
                                    System.out.println("删除项目中原配置文件...");
                                    Thread.sleep(100);
                                    // ... 删除原文件
                                    System.out.println("拷贝配置文件到项目目录...");
                                    // ... 拷贝文件到项目目录
                                } else if (type.equals("delete")) {
                                    System.out.println("\n-------------------\n");
                                    System.out.println("监听到需要删除配置");
                                    System.out.println("删除项目中原配置文件...");
                                }
                                // TODO 视情况统一重启服务
                            }
                        }
                    }
                }
            });
    
            countDown.await();
    
            cto.closeZKClient();
        }
    }
    

    完成以上代码的编写后,将所有的客户类都运行起来。然后到zookeeper服务器上,进行如下操作:

    [zk: localhost:2181(CONNECTED) 14] set /workspace/super/testNode/redis-config {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
    cZxid = 0xc00000039
    ctime = Mon Apr 30 01:43:47 CST 2018
    mZxid = 0xc00000043
    mtime = Mon Apr 30 01:52:35 CST 2018
    pZxid = 0xc00000039
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 75
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 15] set /workspace/super/testNode/redis-config {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
    cZxid = 0xc00000039
    ctime = Mon Apr 30 01:43:47 CST 2018
    mZxid = 0xc00000044
    mtime = Mon Apr 30 01:53:46 CST 2018
    pZxid = 0xc00000039
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 81
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 16] set /workspace/super/testNode/redis-config {"type":"delete","url":"","remark":"delete"}   
    cZxid = 0xc00000039               
    ctime = Mon Apr 30 01:43:47 CST 2018
    mZxid = 0xc00000045
    mtime = Mon Apr 30 01:54:06 CST 2018
    pZxid = 0xc00000039
    cversion = 0
    dataVersion = 3
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 44
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 17] 
    

    此时,三个客户端的控制台输出信息如下:


    image.png
    image.png
    image.png

    如上,从三个客户端的控制台输出信息可以看到,三个节点都进行了同样操作,触发了同样的watch事件,这样就可以完成统一的配置文件管理。


    curator之acl权限操作与认证授权

    以上我们介绍了curator对节点进行增删查改以及注册watch事件的操作,最后我们来演示一下,使用curator如何对节点的acl权限进行操作以及与zk服务端建立连接登录用户实现认证授权。

    我们先演示在创建节点时设置acl权限,现在/workspace/super只有如下节点:

    [zk: localhost:2181(CONNECTED) 27] ls /workspace/super
    [xxxnode, testNode]
    [zk: localhost:2181(CONNECTED) 28]
    

    然后新建一个 CuratorAcl 类,关于acl权限的概念以及部分API代码都在之前的zk原生API使用一文中介绍过了,所以这里就不赘述了。编写代码如下:

    package org.zero01.zk.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.zero01.zk.util.AclUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @program: zookeeper-connection
     * @description: curator操作zk节点acl权限演示demo
     * @author: 01
     * @create: 2018-04-29 19:53
     **/
    public class CuratorAcl {
    
        // Curator客户端
        public CuratorFramework client = null;
        // 集群模式则是多个ip
        private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    
        public CuratorAcl() {
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder().authorization("digest", "user1:123456a".getBytes())  // 认证授权,登录用户
                    .connectString(zkServerIps)
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                    .namespace("workspace").build();
            client.start();
        }
    
        public void closeZKClient() {
            if (client != null) {
                this.client.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            // 实例化
            CuratorAcl cto = new CuratorAcl();
            boolean isZkCuratorStarted = cto.client.isStarted();
            System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
    
            String nodePath = "/super/testAclNode/testOne";
    
            // 自定义权限列表
            List<ACL> acls = new ArrayList<ACL>();
            Id user1 = new Id("digest", AclUtils.getDigestUserPwd("user1:123456a"));
            Id user2 = new Id("digest", AclUtils.getDigestUserPwd("user2:123456b"));
            acls.add(new ACL(ZooDefs.Perms.ALL, user1));
            acls.add(new ACL(ZooDefs.Perms.READ, user2));
            acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2));
    
            // 创建节点,使用自定义权限列表来设置节点的acl权限
            byte[] nodeData = "child-data".getBytes();
            cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(nodePath, nodeData);
    
            cto.closeZKClient();
            boolean isZkCuratorStarted2 = cto.client.isStarted();
            System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
        }
    }
    

    运行该类,然后到zookeeper服务器上,通过命令行进行如下操作:

    [zk: localhost:2181(CONNECTED) 19] ls /workspace/super/testAclNode    
    [testOne]
    [zk: localhost:2181(CONNECTED) 20] getAcl /workspace/super/testAclNode
    'world,'anyone
    : cdrwa
    [zk: localhost:2181(CONNECTED) 21] getAcl /workspace/super/testAclNode/testOne
    'digest,'user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
    : cdrwa
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : r
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : cd
    [zk: localhost:2181(CONNECTED) 22] 
    

    可以看到,当递归创建节点时,只会对最末端的子节点赋予自定义的acl权限,父节点都是zk默认的匿名权限。

    如果想要在递归创建节点时,父节点和子节点的acl权限都是我们自定义的权限,那么就需要在withACL方法中,传递一个true,表示递归创建时所有节点的权限,都是我们设置的权限。修改代码如下:

    // 创建节点,使用自定义权限列表来设置节点的acl权限
    byte[] nodeData = "child-data".getBytes();
    cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true).forPath(nodePath, nodeData);
    

    运行该类,然后到zookeeper服务器上,通过命令行进行如下操作:

    [zk: localhost:2181(CONNECTED) 28] getAcl /workspace/super/testAclNodeTwo
    'digest,'user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
    : cdrwa
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : r
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : cd
    [zk: localhost:2181(CONNECTED) 30] getAcl /workspace/super/testAclNodeTwo/testOne
    'digest,'user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
    : cdrwa
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : r
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : cd
    [zk: localhost:2181(CONNECTED) 31] 
    

    如上,可以看到,创建的全部节点的acl权限都是我们设置的自定义权限。

    最后我们再来演示如何修改一个已存在的节点的acl权限,修改 CuratorAcl 类中的main方法代码如下:

        public static void main(String[] args) throws Exception {
            // 实例化
            CuratorAcl cto = new CuratorAcl();
            boolean isZkCuratorStarted = cto.client.isStarted();
            System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
    
            String nodePath = "/super/testAclNodeTwo/testOne";
    
            // 自定义权限列表
            List<ACL> acls = new ArrayList<ACL>();
            Id user1 = new Id("digest", AclUtils.getDigestUserPwd("user1:123456a"));
            Id user2 = new Id("digest", AclUtils.getDigestUserPwd("user2:123456b"));
            acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN, user1));
            acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2));
    
            // 设置指定节点的acl权限
            cto.client.setACL().withACL(acls).forPath(nodePath);
    
            cto.closeZKClient();
            boolean isZkCuratorStarted2 = cto.client.isStarted();
            System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
        }
    

    运行该类,然后到zookeeper服务器上,通过命令行进行如下操作:

    [zk: localhost:2181(CONNECTED) 31] getAcl /workspace/super/testAclNodeTwo/testOne
    'digest,'user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
    : cra
    'digest,'user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
    : cdr
    [zk: localhost:2181(CONNECTED) 32] 
    

    可以看到,成功修改了该节点的acl权限

    相关文章

      网友评论

          本文标题:使用Apache Curator操作ZooKeeper

          本文链接:https://www.haomeiwen.com/subject/fsopictx.html