美文网首页
(9)zookeeper的客户端api的基本操作

(9)zookeeper的客户端api的基本操作

作者: Mrsunup | 来源:发表于2018-11-26 00:24 被阅读0次

    zookeeper的开源客户端有基于原生的zookeeper的客户端还有zkclient还有curator,下面就针对原生的api和curator的api来实现zookeeper的基本节点的操作

    1.基于zookeeper原生的客户端展示

    • 依赖jar的环境
        <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.8</version>
        </dependency>
    
    • 代码演示
    /**
     * @Project: 3.DistributedProject
     * @description: 用原生zookeeper客户端的api
     * @author: sunkang
     * @create: 2018-06-23 13:04
     * @ModificationHistory who      when       What
     **/
    public class OrginZookeeperConnectionDemo {
    
        public static void main(String[] args) throws Exception {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Watcher watcher =  new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if( watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        //连接成功会有SyncConnected事件产生
                        System.out.println("默认事件"+watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
                        //如果收到了服务端的响应事件,连接成功,接下来才可以对zookeeper的数据节点进行操作
                        countDownLatch.countDown();
                    }
                }};
    
            final ZooKeeper zooKeeper = new ZooKeeper("192.168.44.129:2181", 4000,watcher );
    
            countDownLatch.await();
    
            zooKeeper.exists("/zk-test-create", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
                    //再次绑定
                    try {
                        //这里会触发默认的全局事件
                        zooKeeper.exists(watchedEvent.getPath(),true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            });
    
            //1.创建临时节点
            zooKeeper.create("/zk-test-create","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
            Thread.sleep(1000);
            System.out.println("添加节点成功");
    
    
            Stat state = new Stat();
            //2.得到当前节点的值
            byte[]  bytes =  zooKeeper.getData("/zk-test-create",null,state);
            System.out.println("createNode的当前的值为:"+ new String(bytes));
    
            //3.修改当前节点的值
            zooKeeper.setData("/zk-test-create","2".getBytes(),state.getVersion());
            //得到当前节点的值
            byte[]  byte1s =  zooKeeper.getData("/zk-test-create",null,state);
            System.out.println("createNode的修改后值为 : "+ new String(byte1s));
    
    
            //4.查看子节点
            List<String> childrenList =  zooKeeper.getChildren("/",false);
            System.out.println("childrenList: "+childrenList);
    
            //5.删除节点的值
            zooKeeper.delete("/zk-test-create",state.getVersion());
    
    
            //6.设置权限认证
            zooKeeper.addAuthInfo("digest","foo:true".getBytes());
            zooKeeper.create("/zk-book-auth_test","init".getBytes(),ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);
    
            //7.新建客户端需要认证获取对应的节点
           byte[] authBytes =  zooKeeper.getData("/zk-book-auth_test",false,null);
            System.out.println("/zk-book-auth_test的value:"+ new String(authBytes) );
    
            zooKeeper.close();
    //        System.in.read();
        }
    }
    
    • 输出结果如下
      原生的pai需要watch触发一次就失效了,需要重复的注册watcher,一般来说,客户端启动的时候,可以传入一个全局的watcher,这个watcher会一直有效,当extist的watcher的时候,可以通过传入true来判断是否使用全局默认的watcher
    默认事件null->SyncConnected->None
    /zk-test-create->SyncConnected->NodeCreated
    添加节点成功
    createNode的当前的值为:0
    默认事件/zk-test-create->SyncConnected->NodeDataChanged
    createNode的修改后值为 : 2
    childrenList: [curator_recipes_distatomicint_path, zookeeper, zk-test-create, curator_recipes_master_path, curator_recipes_lock_path, locks, c1, curator_recipes_barrier_path]
    /zk-book-auth_test的value:init
    

    2..基于curator的客户端展示

    • 增加依赖
        <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
          <version>4.0.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-recipes</artifactId>
          <version>4.0.0</version>
        </dependency>
      </dependencies>
    
    • 代码演示
    /**
     * @Project: 3.DistributedProject
     * @description: 使用curator来操作节点
     * @author: sunkang
     * @create: 2018-06-23 14:25
     * @ModificationHistory who      when       What
     **/
    public class CuratorDemo {
        public static void main(String[] args) throws Exception {
            CuratorFramework curator =CuratorFrameworkFactory.builder()
                    .connectString("192.168.44.129:2181")
                    .connectionTimeoutMs(4000)//连接超时时间设置4秒中
                    .sessionTimeoutMs(4000)//session超时设置4秒中
                    .retryPolicy(new ExponentialBackoffRetry(1000,3))//设置连接的重试机制
                    .namespace("curator")//设置命名空间,表明接下来的节点操作都在/curator的下进行操作
                    .build();
    
            //启动
            curator.start();
    
            //1.创建节点  creatingParentsIfNeeded如果子节点的父级节点不存在,会联级创建
            curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/sunkang/test","sunkang".getBytes());
            System.out.println("创建节点成功");
    
            //2.获取节点的状态
            Stat state =new Stat();
            System.out.println("/sunkang/test的值: "+ new String(curator.getData().storingStatIn(state).forPath("/sunkang/test")));
    
            //3.设置改变节点
            curator.setData().withVersion(state.getAversion()).forPath("/sunkang/test","xx".getBytes());
    
            //4.获取子节点
            List<String> childrens = curator.getChildren().forPath("/sunkang");
            System.out.println("childrens : "+childrens);
    
            //5.检查是否存在
            Stat stat =  curator.checkExists().forPath("/sunkang");
            System.out.println("state: "+ stat);
    
            //6.使用watcher
            curator.getChildren().usingWatcher(new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.getPath()+"->"+event.getState()+"->"+event.getType());
                }
            }).forPath("/sunkang");
            //7.删除节点,deletingChildrenIfNeeded表示级联删除
            curator.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath("/sunkang");
    
            System.in.read();
        }
    }
    
    • 输出结果
    创建节点成功
    /sunkang/test的值: sunkang
    childrens : [test]
    state: 505,505,1542703973660,1542703973660,0,1,0,0,0,1,506
    
    /sunkang->SyncConnected->NodeChildrenChanged
    

    3..基于curator的客户端监听的展示

    主要利用了NodeCache和PathChildrenCache以及和TreeCache来实现监听。
    代码如下 :

    /**
     * @Project: 3.DistributedProject
     * @description: curator实现监听
     * @author: sunkang
     * @create: 2018-06-23 14:45
     * @ModificationHistory who      when       What
     **/
    public class CuratorWatcherDemo {
    
        public static void main(String[] args) throws Exception {
            CuratorFramework curator =CuratorFrameworkFactory.builder()
                    .connectString("192.168.44.129:2181")
                    .connectionTimeoutMs(4000)
                    .sessionTimeoutMs(4000)
                    .retryPolicy(new ExponentialBackoffRetry(1000,3))
                    .namespace("curator")
                    .build();
    
            curator.start();
            //当前节点的监听
    //        addListenerWhitNodeCash(curator,"/sunkang");
            //监听子节点的监听
    //        addListenerWhitPathChildCash(curator,"/sunkang");
            //综合性事件
            addListenerWithTreeCache(curator,"/sunkang");
            System.in.read();
    
        }
    
        /**
         * 即节点的监听又监听子节点的监听
         * @param curator
         * @param s
         * @throws Exception
         */
        private static void addListenerWithTreeCache(CuratorFramework curator, String s) throws Exception {
           final TreeCache treeCache = new TreeCache(curator,s);
           TreeCacheListener listener = new TreeCacheListener() {
               @Override
               public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                   System.out.println("zonghe "+treeCacheEvent.getData()+";"+treeCacheEvent.getType());
               }
           };
           treeCache.getListenable().addListener(listener);
           treeCache.start();
        }
    
        /**
         *对给具体的节点的子节点的增加监听,子节点的删除,创建和数据节点的内容发生变化,会触发监听事件
         * @param curator
         * @param s
         * @throws Exception
         */
        private static void addListenerWhitPathChildCash(final CuratorFramework curator, String s) throws Exception {
            final PathChildrenCache pathChildrenCache =new PathChildrenCache(curator,s,true);
    
            PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("CHILD_ADDED," + event.getData().getPath());
                            break;
                        case CHILD_UPDATED:
                            System.out.println("CHILD_UPDATED," + event.getData().getPath());
                            break;
                        case CHILD_REMOVED:
                            System.out.println("CHILD_REMOVED," + event.getData().getPath());
                            break;
                        default:
                            break;
                    }
                }
            };
            pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
            pathChildrenCache.start();
    
        }
    
        /**
         * 给具体的节点的增加监听,创建,删除,数据值改变
         * @param curator
         * @param s
         * @throws Exception
         */
        private static void addListenerWhitNodeCash(CuratorFramework curator, String s) throws Exception {
            final NodeCache nodeCache = new NodeCache(curator,s);
            NodeCacheListener nodeCacheListener = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("Node data update, new data: " + new String(nodeCache.getCurrentData().getData()));
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
        }
    }
    
    

    更多的操作内容可以参考:https://github.com/sunkang123/zookeeper

    相关文章

      网友评论

          本文标题:(9)zookeeper的客户端api的基本操作

          本文链接:https://www.haomeiwen.com/subject/jynmqqtx.html