美文网首页
通过Java操作Zookeeper

通过Java操作Zookeeper

作者: liangyongtong | 来源:发表于2019-11-12 14:11 被阅读0次

    原谅链接: https://mp.weixin.qq.com/s/PgDXSFGnA7kZNX7Qpxg04A

    之前写了一篇通过zkCli操作zookeeper的文章,这一篇是通过Java操作zookeeper的文章,代码在这: https://github.com/liangyt/ZookeeperTest/tree/master/base

    因为使用的是 zookeeper-3.5.5 版本,所以使用对应的版本。

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.5</version>
    </dependency>
    

    有一些回调的定义类比较简单,这里不就列表出来了,可以进代码直接翻看就行了。

    一、连接服务

    要操作zookeeper第一件事就是连接上zookeeper服务器:

    在类 {common.ZkConnect}

    /**
     * 创建zk连接
     * @return
     * @throws IOException
     */
    public static ZooKeeper instance() throws IOException {
        ZooKeeper zk = new ZooKeeper(
                "localhost:2181", // 连接的服务地址
                5000,  // 会话超时时间, 在超时时间内会进行心跳检测;如果超过这个时间没有心跳检测,则服务端认为这个会话超时了
                new DefaultWatcher() // 默认的会话监听器, 如果设置为 null 则表示没有默认的监听器了
        );
        return zk;
    }
    
    /**
     * 创建zk连接
     * @return
     * @throws IOException
     */
    public static ZooKeeper instance(CountDownLatch latch) throws IOException {
        ZooKeeper zk = new ZooKeeper(
                "localhost:2181", // 连接的服务地址
                5000,  // 会话超时时间, 在超时时间内会进行心跳检测;如果超过这个时间没有心跳检测,则服务端认为这个会话超时了
                new DefaultWatcher(latch) // 默认的会话监听器, 如果设置为 null 则表示没有默认的监听器了
        );
        return zk;
    }
    
    /**
     * 创建zk连接
     * @param sessionId 会话id zk.getSessionId()
     * @param sessionpwd 会话密码 zk.getSessionPasswd()
     * @return
     * @throws IOException
     */
    public static ZooKeeper instance(long sessionId, byte[] sessionpwd) throws IOException {
        ZooKeeper zk = new ZooKeeper(
                "127.0.0.1:2181",
                5000,
                new DefaultWatcher(),
                sessionId,
                sessionpwd
                );
    
        return zk;
    }
    
    /**
     * 创建zk连接
     * @param sessionId 会话id zk.getSessionId()
     * @param sessionpwd 会话密码 zk.getSessionPasswd()
     * @Param latch 同步对象
     * @return
     * @throws IOException
     */
    public static ZooKeeper instance(long sessionId, byte[] sessionpwd, CountDownLatch latch) throws IOException {
        ZooKeeper zk = new ZooKeeper(
                "127.0.0.1:2181",
                5000,
                new DefaultWatcher(latch),
                sessionId,
                sessionpwd
            );
        return zk;
    }
    

    创建了几个连接zookeeper服务器的简易方法,设置了默认的服务器地址[127.0.0.1:2181],这个地址可以是单台服务,也可以是集群服务,如果是群集的话则格式为 [ip1:port1,ip2:port2 ...]。

    二、创建节点

    节点的创建方法有几个:

    image

    这6个方法分同步和异步,不带 callback 的是同步方法。

    具体使用方法看这个类:{base1.CreateTest}

    private static void normalCreate(ZooKeeper zk) throws KeeperException, InterruptedException {
        zk.create(
                "/zk-java-01", // 节点路径
                "你们好".getBytes(), // 节点内容
    
                /**
                 * 节点权限 ZooDefs.Ids.OPEN_ACL_UNSAFE -> 是 world anyone 所有的权限
                 * 可以自已定义一个权限列表: ArrayList<ACL>
                 * new ACL("per", "ID") 构成一个权限对象 可以设置多个
                 * per 表示操作权限:READ = 1; WRITE = 2; CREATE = 4; DELETE = 8; ADMIN = 16; ALL = 31;
                 * ID 表示授权对象 new ID("scheme", "id") 如 new ID("world", "anyone")
                 */
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
    
                /**
                 * 节点类型: 使用 zkCli 客户端创建节点一般是只有前四种
                 * PERSISTENT 持久型
                 * PERSISTENT_SEQUENTIAL 持久有序型
                 * EPHEMERAL 临时型
                 * EPHEMERAL_SEQUENTIAL 临时有序型
                 * CONTAINER 容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点
                 * PERSISTENT_WITH_TTL 有TTL[存活时间]的永久节点,节点在TTL时间之内没有得到更新并且无子节点,就会被自动删除 需要配合另外一个参数一起
                 * PERSISTENT_SEQUENTIAL_WITH_TTL 有TTL[存活时间]和有序的永久节点,节点在TTL时间之内没有得到更新并且无子节点,就会被自动删除 需要配合另外一个参数一起
                 */
                CreateMode.PERSISTENT // 永久节点
        );
    }
    
    public static void statCreate(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
        // 用于保存节点创建完成后的状态信息
        Stat stat = new Stat();
        String p = zk.create(
                path,
                "带自定义状态".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT,
                /**
                 * 节点状态:
                 * czxid;
                 * mzxid;
                 * ctime;
                 * mtime;
                 * version;
                 * cversion;
                 * aversion;
                 * ephemeralOwner;
                 * dataLength;
                 * numChildren;
                 * pzxid;
                 */
                stat
        );
    
        System.out.println("path ->" + p);
        System.out.println("stat -> " + stat);
    }
    
    private static void asyncCreate(ZooKeeper zk) throws InterruptedException {
        zk.create(
                "/zk-java-async-01",
                "带自定义状态".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT,
                new BaseStringCallback(), // 节点创建完成的回调
                "我是异步创建节点"
        );
    }
    

    三、获取节点数据

    image

    获取节点数据有四个方法,使用主要看类:{base1.GetTest}

    public static String getData(ZooKeeper zk, Stat stat) throws KeeperException, InterruptedException {
        byte[] data = zk.getData(
                "/zk-java-stat-01", // 节点路径
                true,  // 是否使用默认监听器
                stat // 用于存放服务器返回的 stat
        );
    
        System.out.println("data -> " + new String(data));
    
        System.out.println("aversion -> " + stat.getAversion());
        System.out.println("ctime -> " + stat.getCtime());
        System.out.println("cversion -> " + stat.getCversion());
        System.out.println("dataLength -> " + stat.getDataLength());
        System.out.println("version -> " + stat.getVersion());
    
        return new String(data);
    }
    
    public static String getDataNewWatcher(ZooKeeper zk) throws KeeperException, InterruptedException {
    
        Stat stat = new Stat();
        byte[] data = zk.getData(
                "/zk-java-stat-01",
                new CustomWatcher(), // 注册节点内容变更监听
                stat
        );
    
        System.out.println("data -> " + new String(data));
    
        return new String(data);
    }
    
    private static void asyncGetData(ZooKeeper zk) {
        zk.getData(
                "/zk-java-stat-01",
                false, // 不使用监听器
                new DataCallback(), // 异步获取数据的回调
                "异步获取数据"
        );
    
        zk.getData(
                "/zk-java-stat-01",
                new CustomWatcher(), // 使用自定义数据变更监听器 节点内容变更监听
                new DataCallback(), // 异步获取数据的回调
                "异步获取监听数据"
        );
    }
    

    四、更新节点数据

    image

    更新节点数据方法只有两个了,使用方法看类:{base1.UpdateTest}

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = ZkConnect.instance(latch);
        latch.await();
    
        Stat gStat = new Stat();
        String oldData = GetTest.getData(zk, gStat);
    
        System.out.println("oldData -> " + oldData);
        System.out.println("oldVersion -> " + gStat.getVersion());
    
        Stat stat = zk.setData(
                "/zk-java-stat-01", // 节点路径
                "我是新数据".getBytes(), // 新数据
                gStat.getVersion() // 如果版本号跟服务器上保存的不一样, 则此时出现异常 org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /zk-java-stat-01
                );
        // 此时版本号已更新
        System.out.println("newVersion -> " + stat.getVersion());
    
        CountDownLatch latch1 = new CountDownLatch(1);
        zk.setData(
                "/zk-java-stat-01", // 节点路径
                "我是异步更新新数据".getBytes(), // 新数据
                stat.getVersion(),
                new StatCallback(latch1),
                "异步更新数据"
        );
    
        latch1.await();
    }
    

    五、删除节点

    image.gif

    两个方法,一个同步删除,一个异步删除。代码在类:{base1.DeleteTest}

    private static void baseDelete(ZooKeeper zk) throws KeeperException, InterruptedException {
        zk.delete(
                "/zk-java-01", // 需要删除的节点全路径
                0 // 删除节点的版本号 如果版本号对不上的话则删除失败
        );
    }
    
    private static void asyncDelete(ZooKeeper zk) {
        zk.delete(
                "/zk-java-01",
                0,
                new BaseVoidCallback(), // 删除回调
                "删除基本节点"
        );
    }
    

    六、获取子节点列表

    image

    获取子节点列表的方法比较多,这里试了一个同步一个异步的,类:{base1.GetChildrenTest}

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = ZkConnect.instance(latch);
        latch.await();
    
        // 直接获取节点的子节点
        List<String> children = getChildren(zk, "/zk-java-stat-01");
    
        System.out.println("children -> " + children);
    
        // 添加一个子节点
        // 重复添加子节点出现节点重复异常
        // org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-stat-01/child
        CreateTest.statCreate(zk, "/zk-java-stat-01/child01");
    
        // 再次获取子节点
        children = getChildren(zk, "/zk-java-stat-01");
    
        System.out.println("children -> " + children);
    
        // 异常获取子节点列表 并添加一个自定义子节点列表变更监听
        latch = new CountDownLatch(1);
        zk.getChildren(
                "/zk-java-stat-01",
                new CustomWatcher(),
                new ChildrenCallback(latch),
                "异步获取子节点列表"
        );
    
        latch.await();
    }
    
    private static List<String> getChildren(ZooKeeper zk, String pPath) throws KeeperException, InterruptedException {
        // 返回的子节点列表路径都是相对于父节点的路径,而不是全路径
        return zk.getChildren(
                    pPath, // 需要获取子节点列表的节点路径
                    false // 不添加监听
            );
    }
    

    七、权限

    image

    设置权限有两个方法,也比较简单;类:{base1.AclTest}

    public static void main(String[] args) throws Exception {
        String node = "/zk-java-acl";
    
        CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = ZkConnect.instance(latch);
        latch.await();
    
        // 设置超级权限
        /**
         * 设置超级账户连接, super:id 这是 digest:id 的一种特殊方式
         * 启用超级账户需要启动zookeeper服务的时候配置对应的参数(这种提供两种方式):
         * 1. 启动的时候直接添加 -Dzookeeper.DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
         * 2. 在配置文件 zoo.cfg 里面配置 DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
         * 这两种方式都可以配置超级账户,账户可以自定义,结果: BASE64(SHA-1)
         * 可以命令行生成也可以java代码运行生成:
         * java:  DigestAuthenticationProvider.generateDigest(name:password)
         *
         */
        zk.addAuthInfo("digest", "zookeeper:admin".getBytes());
    
        // 先创建一个永久节点, 权限为 ZooDefs.Ids.OPEN_ACL_UNSAFE
        // 如果报节点已存在异常 则把这一行注释掉
        // org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-acl
        CreateTest.statCreate(zk, node);
    
        // 获取节点的权限
        Stat stat = new Stat();
        List<ACL> acls = zk.getACL(
                node, // 需要获取权限信息的节点
                stat // 回设节点状态
        );
    //
    //        //  [31,s{'world,'anyone}] 这是默认创建的节点权限
        System.out.println("acls ->" + acls);
        System.out.println("aclVersion -> " + stat.getAversion());
    
        // 异步获取节点权限
        zk.getACL(node, stat, new AsyncCallback.ACLCallback() {
            @Override
            public void processResult(int resultCode, String path, Object ctx, List<ACL> list, Stat stat) {
                System.out.println("async acls ->" + list);
            }
        }, "异步获取权限");
    
    //
        String auth = "name:password";
        // 对节点设置权限
        acl(node, zk, stat, auth);
    
    
        // 另起一个无权限会话 读取该节点的数据看看
    //        noAuthGetData(node);
    
        // 另起一个会话并设置权限 读取该节点数据
        authGetData(node, auth);
    
        // 删除节点
        zk.getData(node, null, stat);
        zk.delete(node, stat.getVersion());
    }
    
    // 给节点设置权限
    private static void acl(String node, ZooKeeper zk, Stat stat, String auth) throws NoSuchAlgorithmException, KeeperException, InterruptedException {
        // 创建授权对象
        Id id = new Id("digest", DigestUtil.digest(auth));
        // 可以定义一个 ip 模式的授权对象
    //        Id ipId = new Id("ip", "192.168.3.17");
    
        // 定义操作权限
        ACL aclRead = new ACL(ZooDefs.Perms.READ, id); // 读取权限
        ACL aclCrd = new ACL(ZooDefs.Perms.CREATE, id); // 创建权限
        ACL aclDel = new ACL(ZooDefs.Perms.DELETE, id); // 删除权限
        ACL aclUpd = new ACL(ZooDefs.Perms.WRITE, id); // 更新权限
        ACL aclAdm = new ACL(ZooDefs.Perms.ADMIN, id); // 权限管理权限
    
        // 对节点 /zk-java-acl 设置权限
        List<ACL> acls = new ArrayList<>();
        acls.add(aclRead);
        acls.add(aclUpd);
    
        // 返回状态 权限版本号已改变了
        Stat aclStat = zk.setACL(node, acls, stat.getAversion());
        System.out.println("aclStatVersion -> " + aclStat.getAversion());
    }
    
    private static void authGetData(String node, String auth) throws IOException, InterruptedException, KeeperException {
        CountDownLatch latch1 = new CountDownLatch(1);
        ZooKeeper authZk = ZkConnect.instance(latch1);
        latch1.await();
    
        // 设置会话权限
        authZk.addAuthInfo("digest", auth.getBytes());
    
        Stat stat = new Stat();
    
        byte[] dataAuth = authZk.getData(node, false, stat);
        System.out.println("dataAuth -> " + new String(dataAuth));
    
        // 没有更新权限,看看能否更新
        authZk.setData(node, "试试能否更新成功".getBytes(), stat.getVersion());
    }
    
    // 测试没有读取权限的会话去读取数据是什么样的
    private static void noAuthGetData(String node) throws IOException, InterruptedException, KeeperException {
        CountDownLatch latch1 = new CountDownLatch(1);
        ZooKeeper noAuthZk = ZkConnect.instance(latch1);
        latch1.await();
    
        // 出现异常
        // org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-java-acl
        byte[] dataNoAuth = noAuthZk.getData(node, false, null);
        System.out.println("dataNoAuth -> " + new String(dataNoAuth));
    }
    
    欢迎关注

    相关文章

      网友评论

          本文标题:通过Java操作Zookeeper

          本文链接:https://www.haomeiwen.com/subject/cqtqictx.html