美文网首页
Zookepper系列之(3)——API使用

Zookepper系列之(3)——API使用

作者: 康康不遛猫 | 来源:发表于2018-03-26 10:42 被阅读0次

    一、ZooKeeper 原生API

    ZooKeeper API分为同步操作和异步操作,包括以下操作:
    创建会话
    创建节点
    删除节点
    读取数据、子节点
    更新数据
    检测节点是否存在
    权限控制

    1、创建会话

    ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    示例:

    public class ZooKeeper_Constructor_Usage_Simple implements Watcher {
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
         
        public static void main(String[] args) throws Exception{
             
            ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                                                5000, //sessionTimeout
                                                new ZooKeeper_Constructor_Usage_Simple());//自身实现watcher接口
            System.out.println(zookeeper.getState());//connecting状态
            try {
                connectedSemaphore.await();//等待连接成功
            } catch (InterruptedException e) {}
            System.out.println("ZooKeeper session established.");
        }
     
     //链接添加监视器,观察连接成功状态
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event:" + event);
            if (KeeperState.SyncConnected == event.getState()) {
                connectedSemaphore.countDown();
            }
        }
    }
    

    利用sessionId ,复用连接

    public class ZooKeeper_Constructor_Usage_With_SID_PASSWD implements Watcher {
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        public static void main(String[] args) throws Exception{
            ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_Constructor_Usage_With_SID_PASSWD());
            connectedSemaphore.await();
            long sessionId = zookeeper.getSessionId();
            byte[] passwd  = zookeeper.getSessionPasswd();
             
            //Use illegal sessionId and sessionPassWd
            zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//
                    1l,//非法的会话id,连接失败
                    "test".getBytes());
            //Use correct sessionId and sessionPassWd
           //复用会话
            zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_Constructor_Usage_With_SID_PASSWD(),//
                    sessionId,//
                    passwd);
            Thread.sleep( Integer.MAX_VALUE );
        }
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event:" + event);
            if (KeeperState.SyncConnected == event.getState()) {
                connectedSemaphore.countDown();
            }
        }
    }
    

    2、创建节点

    同步方法创建节点

    public class ZooKeeper_Create_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
     
        public static void main(String[] args) throws Exception{
            ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_Create_API_Sync_Usage());
            connectedSemaphore.await();
            String path1 = zookeeper.create("/zk-test-ephemeral-", 
                    "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.EPHEMERAL);
            System.out.println("Success create znode: " + path1);
     
            String path2 = zookeeper.create("/zk-test-ephemeral-", 
                    "".getBytes(), //data
                    Ids.OPEN_ACL_UNSAFE,//ACL控制
                    CreateMode.EPHEMERAL_SEQUENTIAL);//mode
            System.out.println("Success create znode: " + path2);
        }
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                connectedSemaphore.countDown();
            }
        }
    }
    

    ZooKeeper有四种形式的目录节点,即四种CreateMode,两大类,持久化节点与临时节点,自动编号节点与非自动编号节点,两两组合,分别如下:
    1、PERSISTENT
    持久化目录节点,存储的数据不会丢失。
    2、PERSISTENT_SEQUENTIAL
    顺序自动编号的持久化目录节点,存储的数据不会丢失,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
    3、EPHEMERAL
    临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除。
    4、EPHEMERAL_SEQUENTIAL
    临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

    ZooKeeper 客户端有3种标准的ACL:
    struct ACL_vector ZOO_OPEN_ACL_UNSAFE; //(ZOO_PERM_ALL,ZOO_ANYONE_ID_UNSAFE)
    struct ACL_vector ZOO_READ_ACL_UNSAFE;// (ZOO_PERM_READ, ZOO_ANYONE_ID_UNSAFE)
    struct ACL_vector ZOO_CREATOR_ALL_ACL; //(ZOO_PERM_ALL,ZOO_AUTH_IDS)
    ZOO_OPEN_ACL_UNSAFE使所有ACL都“开放”了:任何应用程序在节点上可进行任何操作,能创建、列出和删除它的子节点。对任何应用程序,
    ZOO_READ_ACL_UNSAFE是只读的。
    CREATE_ALL_ACL赋予了节点的创建者所有的权限,在创建者采用此ACL创建节点之前,已经被服务器所认证。

    异步方法创建节点

    public class ZooKeeper_Create_API_ASync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
     
        public static void main(String[] args) throws Exception{
             
         ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_Create_API_ASync_Usage());
         connectedSemaphore.await();
             
         zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 
                    new IStringCallback(), "I am context."); //ctx上下文作为回调的入参,I am context
     
         zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, 
                    new IStringCallback(), "I am context.");
             
         zookeeper.create("/zk-test-ephemeral-", "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, 
                    new IStringCallback(), "I am context.");
         Thread.sleep( Integer.MAX_VALUE );
        }
         
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                connectedSemaphore.countDown();
            }
        }
    }
    
    //异步回调类
    class IStringCallback implements AsyncCallback.StringCallback{
      public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("Create path result: [" + rc + ", " + path + ", "
                       + ctx + ", real path name: " + name);
        }
    }
    

    rc : result code,服务器响应码,0接口调用成功(OK),-4客户端和服务端连接断开(ConnectionLoss),-110指定节点已经存在(NodeExists),-112会话已过期(SessionExpired)
    path : 我们传给create的path参数值。
    ctx : 我们传给create的上下文参数。
    name : 创建的znode节点名称。

    AsyncCallback异步callback,根据操作类型的不同,也分几类:
    •StringCallback
    •VoidCallback
    •StatCallback
    •DataCallback (getData请求)
    •ChildrenCallback
    •Children2Callback

    3、删除节点

    public class Delete_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk;
     
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new Delete_API_Sync_Usage());
            connectedSemaphore.await();
     
            zk.create( path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
            zk.delete( path, -1 );//不设置默认监视器
             
            Thread.sleep( Integer.MAX_VALUE );
        }
        @Override
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                }
            }
        }
    }
    

    4、读取数据、子节点

    同步读取数据

    public class GetData_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk = null;
        private static Stat stat = new Stat();
     
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new GetData_API_Sync_Usage());
            connectedSemaphore.await();
            zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
             
            //true,这里注册了上下文默认watcher,更新data之后收到通知。stat接收状态信息
            System.out.println(new String(zk.getData( path, true, stat )));
            System.out.println(stat.getCzxid()+","+stat.getMzxid()+","+stat.getVersion());
             
            zk.setData( path, "123".getBytes(), -1 );//无视事务id更新数据
             
            Thread.sleep( Integer.MAX_VALUE );
        }
       
        //分别设置连接成功和data更新的事件处理
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                } else if (event.getType() == EventType.NodeDataChanged) {
                    try {//setData之后,检测到EventType.NodeDataChanged事件
                        System.out.println(new String(zk.getData( event.getPath(), true, stat )));
                        System.out.println(stat.getCzxid()+","+
                                           stat.getMzxid()+","+
                                           stat.getVersion());
                    } catch (Exception e) {}
                }
              }
           }
    }
    

    异步读取数据,适用读取大量数据,异步获取避免影响主任务

    public class GetData_API_ASync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk;
     
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new GetData_API_ASync_Usage());
            connectedSemaphore.await();
             
            zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
             
            zk.getData( path, true, new IDataCallback(), null );//不直接拿到data数据,由IDataCallback异步获取
             
            zk.setData( path, "123".getBytes(), -1 );
             
            Thread.sleep( Integer.MAX_VALUE );
        }
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                } else if (event.getType() == EventType.NodeDataChanged) {
                    try {
                      zk.getData( event.getPath(), true, new IDataCallback(), null );//检测节点变化,也用DataCallback异步获取
                    } catch (Exception e) {}
                }
              }
           }
    }
    
    //DataCallback
    class IDataCallback implements AsyncCallback.DataCallback{
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            System.out.println(rc + ", " + path + ", " + new String(data));
            System.out.println(stat.getCzxid()+","+
                                 stat.getMzxid()+","+
                               stat.getVersion());
        }
    }
    

    同步获取子节点

    public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk = null;
         
        public static void main(String[] args) throws Exception{
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_GetChildren_API_Sync_Usage());
            connectedSemaphore.await();
            zk.create(path, "".getBytes(), 
                      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zk.create(path+"/c1", "".getBytes(), 
                      Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             
            List<String> childrenList = zk.getChildren(path, true);//默认watcher,增加c2节点,收到NodeChildrenChanged通知
            System.out.println(childrenList);
             
            zk.create(path+"/c2", "".getBytes(), 
                      Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             
            Thread.sleep( Integer.MAX_VALUE );
        }
        public void process(WatchedEvent event) {
          if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            } else if (event.getType() == EventType.NodeChildrenChanged) {
                try {
                    System.out.println("ReGet Child:"+zk.getChildren(event.getPath(),true));
                } catch (Exception e) {}
            }
          }
        }
    }
    

    异步获取子节点

    public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk = null;
     
        public static void main(String[] args) throws Exception{
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new ZooKeeper_GetChildren_API_ASync_Usage());
            connectedSemaphore.await();
            zk.create(path, "".getBytes(), 
                      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zk.create(path+"/c1", "".getBytes(), 
                      Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             
            zk.getChildren(path, true, new IChildren2Callback(), null);//异步通过IChildren2Callback获取节点。
            //默认watcher,增加c2节点,收到NodeChildrenChanged通知
             
            zk.create(path+"/c2", "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             
            Thread.sleep( Integer.MAX_VALUE );
        }
        public void process(WatchedEvent event) {
          if (KeeperState.SyncConnected == event.getState()) {
              if (EventType.None == event.getType() && null == event.getPath()) {
                  connectedSemaphore.countDown();
              } else if (event.getType() == EventType.NodeChildrenChanged) {
                  try {
                      System.out.println("ReGet Child:"+ zk.getChildren(path, true, new IChildren2Callback(), null));
                  } catch (Exception e) {}
              }
            }
         }
    }
    
    //Children2Callback,不同的callback,参数不同
    class IChildren2Callback implements AsyncCallback.Children2Callback{
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path
                    + ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat);
        }
    }
    

    5、更新数据

    同步更新数据

    public class SetData_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk;
     
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new SetData_API_Sync_Usage());
            connectedSemaphore.await();
             
            zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
            zk.getData( path, true, null );
             
            Stat stat = zk.setData( path, "456".getBytes(), -1 );//-1。基于最新的版本更新操作,或者对于更新操作无原子性要求
            System.out.println(stat.getCzxid()+","+
                               stat.getMzxid()+","+
                               stat.getVersion());
            Stat stat2 = zk.setData( path, "456".getBytes(), stat.getVersion() );
            System.out.println(stat2.getCzxid()+","+
                                stat2.getMzxid()+","+
                                stat2.getVersion());
            try {
                zk.setData( path, "456".getBytes(), stat.getVersion() );
            } catch ( KeeperException e ) {
                System.out.println("Error: " + e.code() + "," + e.getMessage());//更新失败
            }
            Thread.sleep( Integer.MAX_VALUE );
        }
     
        @Override
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                }
            }
        }
    }
    

    异步更新数据

    public class SetData_API_ASync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk;
     
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new SetData_API_ASync_Usage());
            connectedSemaphore.await();
     
            zk.create( path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL );
            zk.setData( path, "456".getBytes(), -1, new IStatCallback(), null );
             
            Thread.sleep( Integer.MAX_VALUE );
        }
        @Override
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    connectedSemaphore.countDown();
                }
            }
        }
    }
    
    //StatCallback
    class IStatCallback implements AsyncCallback.StatCallback{
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            if (rc == 0) {
                System.out.println("SUCCESS");
            }
        }
    }
    

    6、检测节点是否存在

    public class Exist_API_Sync_Usage implements Watcher {
     
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        private static ZooKeeper zk;
        public static void main(String[] args) throws Exception {
     
            String path = "/zk-book";
            zk = new ZooKeeper("domain1.book.zookeeper:2181", 
                    5000, //
                    new Exist_API_Sync_Usage());
            connectedSemaphore.await();
     
            zk.exists( path, true );//还没创建的节点,仍能监听
             
            zk.create( path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
             
            zk.setData( path, "123".getBytes(), -1 );
             
            zk.create( path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );//子节点变化不会通知
             
            zk.delete( path+"/c1", -1 );
             
            zk.delete( path, -1 );
             
            Thread.sleep( Integer.MAX_VALUE );
        }
     
        @Override
        public void process(WatchedEvent event) {
            try {
                if (KeeperState.SyncConnected == event.getState()) {
                    if (EventType.None == event.getType() && null == event.getPath()) {
                        connectedSemaphore.countDown();
                    } else if (EventType.NodeCreated == event.getType()) {
                        System.out.println("Node(" + event.getPath() + ")Created");
                        zk.exists( event.getPath(), true );
                    } else if (EventType.NodeDeleted == event.getType()) {
                        System.out.println("Node(" + event.getPath() + ")Deleted");
                        zk.exists( event.getPath(), true );
                    } else if (EventType.NodeDataChanged == event.getType()) {
                        System.out.println("Node(" + event.getPath() + ")DataChanged");
                        zk.exists( event.getPath(), true );
                    }
                }
            } catch (Exception e) {}
        }
    }
    

    7、权限控制

    使用无权限会话,或者错误权限会话,访问有权限的节点数据会失败

    public class AuthSample_Get2 {
     
        final static String PATH = "/zk-book-auth_test";
        public static void main(String[] args) throws Exception {
     
            ZooKeeper zookeeper1 = new ZooKeeper("domain1.book.zookeeper:2181",5000,null);
            zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
            zookeeper1.create( PATH, "init".getBytes(), //
                               Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL );
             
            ZooKeeper zookeeper2 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);
            zookeeper2.addAuthInfo("digest", "foo:true".getBytes());
            System.out.println(zookeeper2.getData( PATH, false, null ));
             
            ZooKeeper zookeeper3 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);
            zookeeper3.addAuthInfo("digest", "foo:false".getBytes());
            zookeeper3.getData( PATH, false, null );
        }
    }
    

    删除节点的权限控制,删除子节点需权限,删除节点本身不需要权限

    public class AuthSample_Delete {
     
        final static String PATH  = "/zk-book-auth_test";
        final static String PATH2 = "/zk-book-auth_test/child";
        public static void main(String[] args) throws Exception {
     
            ZooKeeper zookeeper1 = new ZooKeeper("domain1.book.zookeeper:2181",5000,null);
            zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
            zookeeper1.create( PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT );
            zookeeper1.create( PATH2, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL );
             
            try {
                ZooKeeper zookeeper2 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);
                zookeeper2.delete( PATH2, -1 );
            } catch ( Exception e ) {
                System.out.println( "删除节点失败: " + e.getMessage() );
            }
             
            ZooKeeper zookeeper3 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);
            zookeeper3.addAuthInfo("digest", "foo:true".getBytes());
            zookeeper3.delete( PATH2, -1 );
            System.out.println( "成功删除节点:" + PATH2 );
             
            ZooKeeper zookeeper4 = new ZooKeeper("domain1.book.zookeeper:2181",50000,null);
            zookeeper4.delete( PATH, -1 );
            System.out.println( "成功删除节点:" + PATH );
        }
    }
    

    相关文章

      网友评论

          本文标题:Zookepper系列之(3)——API使用

          本文链接:https://www.haomeiwen.com/subject/zvodxxtx.html