美文网首页
ZooKeeper(四)ZooKeeper客户端Java zkC

ZooKeeper(四)ZooKeeper客户端Java zkC

作者: 7ColorLotus | 来源:发表于2020-06-24 14:34 被阅读0次
    • zkClient:开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端
      1. 创建会话(同步,重试)
      2. 创建节点(同步,递归创建)
      3. 删除节点(同步,递归删除)
      4. 代码示例
        import net.lc7.model.User;
        import net.lc7.util.ZkPropertiesUtil;
        import org.I0Itec.zkclient.IZkChildListener;
        import org.I0Itec.zkclient.IZkDataListener;
        import org.I0Itec.zkclient.ZkClient;
        import org.I0Itec.zkclient.serialize.SerializableSerializer;
        import org.apache.zookeeper.CreateMode;
        import org.apache.zookeeper.data.Stat;
      
        import java.util.List;
      
        /**
         * @Description:
         * @Author: Jason.zhu
         * @Create: 2019/05/22 17:37
         */
      
        public class ZkClientMain {
            private static ZkClient zkClient;
      
      
            public static void main(String[] args) throws Exception{
                String ip_port = ZkPropertiesUtil.getZkServerIp();
                int sessionTimeout = 10000;
                int connetionTimeout = ZkPropertiesUtil.getZktimeout();
                zkClient = new ZkClient(ip_port,sessionTimeout,connetionTimeout,new SerializableSerializer());
      
                updateNodeData();
            }
      
            /**
             * 新建节点
             */
            private static void createNode(){
                User user = User.builder().age(18).id(1).name("jason").build();
                String node = zkClient.create("/user",user,CreateMode.PERSISTENT);
                System.out.println(node);
            }
      
            //获取节点
            private static void getNodeData(){
                Stat stat = new Stat();
                User user1 = zkClient.readData("/user", stat);
                System.out.println("name : " + user1.getName() );
                System.out.println("stat : " + stat);
            }
      
            /**
             * 创建子节点
             */
            private static void createChildrenNode(){
                String node = zkClient.create("/user/1","user1",CreateMode.PERSISTENT);
                System.out.println(node);
                node = zkClient.create("/user/2","user2",CreateMode.PERSISTENT);
                System.out.println(node);
                node = zkClient.create("/user/3","user3",CreateMode.PERSISTENT);
                System.out.println(node);
            }
      
            /**
             * 获取子节点
             */
            private static void getChildNode(){
                String node = "/user";
                boolean exist = zkClient.exists(node);
                if(exist){
                    List<String> childNodes = zkClient.getChildren(node);
                    childNodes.stream().forEach(System.out::println);
                }
            }
      
            private static void delNode(){
                String node = "/user";
                boolean exist = zkClient.exists(node);
                if(exist){
        //           zkClient.delete(node);//删除当前节点,有子节点无法删除
                    zkClient.deleteRecursive(node);//删除当前节点,有子节点删除子节点
                }
            }
      
            /**
             * 更新节点数据
             */
            private static void updateNodeData(){
                User user = User.builder().age(11).name("wangwu").id(12).build();
                String node = "/user";
                zkClient.writeData(node, user);
      
            }
      
            /**
             * 监控子节点
             */
            private static void watchChildChange() throws InterruptedException {
                String node = "/user";
                zkClient.subscribeChildChanges(node, (str,strs) -> {
                    System.out.println(str);
                    System.out.println(strs);
                });
                Thread.sleep(Long.MAX_VALUE);
            }
      
            /**
             *监控节点
             */
            private static void watchNode(){
                String node ="/user";
                zkClient.subscribeDataChanges(node, new IZkDataListener() {
                    @Override
                    public void handleDataChange(String s, Object o) throws Exception {
                        System.out.println("node name : " + s + ",updated !!");
                        User user = (User)o;
                        System.out.println("node new value : " + user);
                    }
      
                    @Override
                    public void handleDataDeleted(String s) throws Exception {
                        System.out.println("node name : " + s + ", deleted !!");
                    }
                });
      
                try {
                    Thread.sleep(Long.MAX_VALUE);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
      
        }
      
      import lombok.Builder;
      import lombok.Data;
      
      import java.io.Serializable;
      
      @Data
      @Builder
      public class User implements Serializable {
            private int id;
            private int age;
            private String name;
      
            @Override
            public String toString(){
                return "id=" + id + ", age="+age + ",name="+name;
           }
      }
      
    • curator:开源的zk客户端,在原生API基础上封装,apache顶级项目
      1. Curator采用Fluent风格API
      2. Curator对zk进行基本操作代码示例:
      import net.lc7.util.ZkPropertiesUtil;
      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.retry.RetryNTimes;
      
      /**
        * @Description: Curator对zk进行基本操作,Curator采用流式风格API
        * @Author: Jason.zhu
        * @Create: 2019/05/24 17:55
        */
      public class CuratorClient {
      private static CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
                    ZkPropertiesUtil.getZkServerIp(),
                    new RetryNTimes(10, 5000));
      
      private static String path = "/curator_node";
      
      public static void main(String[] args) throws Exception {
                curatorFramework.start();
      
                //create node
                String data = "curator_data";
                curatorFramework.create().creatingParentsIfNeeded().forPath(path,data.getBytes());
      
      
                //get Node and Data
                print("ls", "/");
                print(curatorFramework.getChildren().forPath("/"));
      
                print("get", path);
                print(curatorFramework.getData().forPath(path));
      
                //update node data
      
                String dataNew = "curator_data_new";
                print("set", path, dataNew);
                curatorFramework.setData().forPath(path, dataNew.getBytes());
      
                print("get", path);
                print(curatorFramework.getData().forPath(path));
      
                //remove node
                print("delete", path);
                curatorFramework.delete().forPath(path);
                print("ls", "/");
                print(curatorFramework.getChildren().forPath("/"));
      
                curatorFramework.close();
            }
      
            private static void print(String... cmds) {
                StringBuilder text = new StringBuilder("$ ");
                for (String cmd : cmds) {
                    text.append(cmd).append(" ");
                }
                System.out.println(text.toString());
            }
      
            private static void print(Object result) {
                System.out.println(
                        result instanceof byte[]
                                ? new String((byte[]) result)
                                : result);
            }
      
        }
      
        public class ZkPropertiesUtil {
      
        private static Properties zkProperties = new Properties();
      
        static {
      
            InputStream is = ZkPropertiesUtil.class.getResourceAsStream("/zk.properties");
            try {
                zkProperties.load(is);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    if (is != null) {
                        is.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
      
        public static String getZkServerIp(){
            return zkProperties.getProperty("zk.zkServerIps");
        }
      
        public static int getZktimeout(){
            return Integer.valueOf(zkProperties.getProperty("zk.timeout"));
        }
      
        public static void main(String[] args) {
           String source = "0101888";
      
            System.out.println(source.substring(0, source.length() - 2));
        }
      }
      
      1. Curator操作zk实现分布式锁
        import net.lc7.util.ZkPropertiesUtil;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.curator.framework.CuratorFrameworkFactory;
        import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
        import org.apache.curator.framework.recipes.locks.InterProcessMutex;
        import org.apache.curator.retry.RetryNTimes;
      
        import java.util.concurrent.TimeUnit;
      
        /**
         * @Description: Curator操作zk实现分布式锁
         * @Author: Jason.zhu
         * @Create: 2019/05/27 11:28
         */
      
        public class CuratorDistributeLock {
            private static String lockPath = "/lockPath";
      
            public static void main(String[] args) throws Exception{
                CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp(),
                                                                new RetryNTimes(10,5000));
                curatorFramework.start();
                System.out.println("zk client start successfully!!");
                Thread.sleep(1000L);
      
      
                Thread t1 = new Thread(() -> {
                   doWithLock(curatorFramework);
                }, "t1");
      
                Thread t2 = new Thread(() -> {
                    doWithLock(curatorFramework);
                }, "t2");
      
                t1.start();
                t2.start();
      
      
        //        curatorFramework.close();
            }
      
            private static void doWithLock(CuratorFramework curatorFramework){
                System.out.println("Client state : " + curatorFramework.getState());
                InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
      
                try {
                    if(lock.acquire(10*1000, TimeUnit.SECONDS)){
                        System.out.println(Thread.currentThread().getName() + "hold lock!!");
                        Thread.sleep(5000L);
                        System.out.println(Thread.currentThread().getName() + "release lock!!");
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    try {
                        lock.release();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
      
                }
            }
        }
      
      1. Curator操作zk实现Leader选举
        import net.lc7.util.ZkPropertiesUtil;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.curator.framework.CuratorFrameworkFactory;
        import org.apache.curator.framework.recipes.leader.LeaderSelector;
        import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
        import org.apache.curator.framework.state.ConnectionState;
        import org.apache.curator.retry.RetryNTimes;
        import org.apache.curator.utils.EnsurePath;
      
        /**
         * @Description: Leader选举
         * 当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。
         * Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。
         * 注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。
         * autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
         * @Author: Jason.zhu
         * @Create: 2019/05/27 11:49
         */
      
        public class CuratorLeaderClient {
            private static String path = "/ensurePath";
      
            public static void main(String[] args) throws InterruptedException {
                LeaderSelectorListener listener = new LeaderSelectorListener() {
                    @Override
                    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                        System.out.println(Thread.currentThread().getName() + " take leadership !!");
                        Thread.sleep(5000L);
                        System.out.println(Thread.currentThread().getName() + " relinquish leadership !!");
                    }
      
                    @Override
                    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                        System.out.println(connectionState.name() + " state changed !! " + connectionState.isConnected());
                    }
                };
      
                new Thread(() -> {
                    registerListener(listener);
                }).start();
      
                new Thread(() -> {
                    registerListener(listener);
                }).start();
      
      
                new Thread(() -> {
                    registerListener(listener);
                }).start();
      
      
                new Thread(() -> {
                    registerListener(listener);
                }).start();
      
                new Thread(() -> {
                    registerListener(listener);
                }).start();
      
                Thread.sleep(Integer.MAX_VALUE);
            }
      
            public static void registerListener(LeaderSelectorListener listener){
                CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp()
                                                                            ,new RetryNTimes(10, 5000));
      
                curatorFramework.start();
      
                //ensure path
                try {
                    new EnsurePath(path).ensure(curatorFramework.getZookeeperClient());
                } catch (Exception e) {
                    e.printStackTrace();
                }
      
                //register listener
                LeaderSelector selector = new LeaderSelector(curatorFramework, path, listener);
                selector.autoRequeue();
                selector.start();
            }
      
        }
      
      1. Curator操作ZK实现监听功能
        import net.lc7.util.ZkPropertiesUtil;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.curator.framework.CuratorFrameworkFactory;
        import org.apache.curator.framework.recipes.cache.ChildData;
        import org.apache.curator.framework.recipes.cache.PathChildrenCache;
        import org.apache.curator.retry.RetryNTimes;
      
        /**
         * @Description:  Curator实现监听功能
         *
         * Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
         * Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
         * Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
         *
         * @Author: Jason.zhu
         * @Create: 2019/05/27 11:10
         */
      
        public class CuratorWatchClient {
      
            private static String watcherNode = "/watcherNode";
      
            public static void main(String[] args) throws Exception {
                CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory.newClient(
                                                                ZkPropertiesUtil.getZkServerIp(),
                                                                new RetryNTimes(10, 5000));
                curatorFrameworkClient.start();
      
                System.out.println("zk client start successfully!!");
      
      
                //register watcher
                PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFrameworkClient, watcherNode, true);
                pathChildrenCache.getListenable().addListener((client, event) -> {
                    ChildData data = event.getData();
                    if(null == data){
                        System.out.println("no data in event [" + event +"]");
                    }else {
                        System.out.println(
                                "Receive event :" +
                                        "type=["+ event.getType() +"], " +
                                        "path=[" + data.getPath() + "]," +
                                        "data=["+ data.getData() +"]," +
                                        "state=[" + data.getStat() + "]"
                        );
                    }
                });
                pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                System.out.println(" Register zookeeper watcher successfully!! Please operate in terminal to show the listener function");
                Thread.sleep(Integer.MAX_VALUE);
      
            }
        }
      

    相关文章

      网友评论

          本文标题:ZooKeeper(四)ZooKeeper客户端Java zkC

          本文链接:https://www.haomeiwen.com/subject/wqurfktx.html