美文网首页
zookeeper 客户端使用

zookeeper 客户端使用

作者: 香沙小熊 | 来源:发表于2020-02-23 16:26 被阅读0次

    1.原生

    public class ZKWatch implements Watcher {
        private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    
        private ZooKeeper zooKeeper;
    
        public ZKWatch() {
            try {
                zooKeeper = new ZooKeeper(CONNECT_ADDR, 5000, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        public static void main(String[] args) throws KeeperException, InterruptedException {
            ZKWatch zkWatch = new ZKWatch();
    
            if(zkWatch.exists("/xionghu",true)!=null){
                zkWatch.deleteRecursive("/xionghu");
            }
            zkWatch.addPZnode("/xionghu", "2020");
            //zkWatch.addPZnode("/xionghu/aaa", "2019");
    
    
    
            Thread.sleep(2000000);
    
        }
    
        public Stat exists(String path,boolean watch) throws KeeperException, InterruptedException {
            return zooKeeper.exists(path, watch);
    
        }
    
        public void delete(String path) throws KeeperException, InterruptedException {
            zooKeeper.delete(path, -1);
        }
    
        public void deleteRecursive(String path) throws KeeperException, InterruptedException {
            ZKUtil.deleteRecursive(zooKeeper, path);
        }
    
        /**
         * 创建znode结点
         *
         * @param path 结点路径
         * @param data 结点数据
         * @return true 创建结点成功 false表示结点存在
         * @throws Exception
         */
        public boolean addZnodeData(String path, String data, CreateMode mode) {
            try {
                if (zooKeeper.exists(path, true) == null) {
                    zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
                    return true;
                }
            } catch (KeeperException | InterruptedException e) {
                throw new RuntimeException("创建znode:" + path + "出现问题!!", e);
            }
            System.out.println("znode" + path + "结点已存在");
            return false;
        }
    
        public boolean addZnodeData(String path, String data) {
            try {
                zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return true;
            } catch (Exception e) {
                //logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
                return false;
            }
        }
    
        /**
         * 创建永久znode结点
         *
         * @param path 结点路径
         * @param data 结点数据
         * @return true 创建结点成功 false表示结点存在
         * @throws Exception
         */
        public boolean addPZnode(String path, String data) {
            return addZnodeData(path, data, CreateMode.PERSISTENT);
        }
    
        /**
         * 创建临时znode结点
         *
         * @param path 结点路径
         * @param data 结点数据
         * @return true 创建结点成功 false表示结点存在
         * @throws Exception
         */
        public boolean addZEnode(String path, String data) {
            return addZnodeData(path, data, CreateMode.EPHEMERAL);
        }
    
        /**
         * 修改znode
         *
         * @param path 结点路径
         * @param data 结点数据
         * @return 修改结点成功   false表示结点不存在
         */
        public boolean updateZnode(String path, String data) {
            try {
                Stat stat = null;
                if ((stat = zooKeeper.exists(path, true)) != null) {
                    zooKeeper.setData(path, data.getBytes(), stat.getVersion());
                    return true;
                }
            } catch (KeeperException | InterruptedException e) {
                throw new RuntimeException("修改znode:" + path + "出现问题!!", e);
            }
            return false;
        }
    
        /**
         * 删除结点
         *
         * @param path 结点
         * @return true 删除键结点成功  false表示结点不存在
         */
        public boolean deleteZnode(String path) {
            try {
                Stat stat = null;
                if ((stat = zooKeeper.exists(path, true)) != null) {
                    List<String> subPaths = zooKeeper.getChildren(path, false);
                    if (subPaths.isEmpty()) {
                        zooKeeper.delete(path, stat.getVersion());
                        return true;
                    } else {
                        for (String subPath : subPaths) {
                            deleteZnode(path + "/" + subPath);
                        }
                    }
                }
            } catch (InterruptedException | KeeperException e) {
                throw new RuntimeException("删除znode:" + path + "出现问题!!", e);
            }
            return false;
        }
    
        /**
         * 取到结点数据
         *
         * @param path 结点路径
         * @return null表示结点不存在 否则返回结点数据
         */
        public String getZnodeData(String path) {
            String data = null;
            try {
                Stat stat = null;
                if ((stat = zooKeeper.exists(path, true)) != null) {
                    data = new String(zooKeeper.getData(path, true, stat));
                } else {
                    System.out.println("znode:" + path + ",不存在");
                }
            } catch (KeeperException | InterruptedException e) {
                throw new RuntimeException("取到znode:" + path + "出现问题!!", e);
            }
            return data;
        }
    
    
    
        @Override
        public void process(WatchedEvent watchedEvent) {
    
            System.out.println("连接状态:"+watchedEvent.getState()+"   "+ "时间类型 "+watchedEvent.getType()+
               "受影响的path"+ watchedEvent.getPath());
        }
    
    }
    

    2.ZkClient

    <dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version></version>
    </dependency>
    
    public class ZkClientCrud<T> {
        private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    
        private ZkClient zkClient;
    
        public ZkClientCrud() {
            this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
        }
    
        /**
         * 创建持久节点
         */
        public void createPersistent(String path, Object data) {
            zkClient.createPersistent(path, data);
        }
    
        public T readData(String path) {
            return zkClient.readData(path);
        }
    
        //递归删除
        public void deleteRecursive(String path) {
            zkClient.deleteRecursive(path);
    
        }
    
        public void delete(String path) {
            zkClient.delete(path);
    
        }
    
        /***
         * 子节点
         * @param path
         * @return
         */
        public List<String> getChildren(String path){
            return zkClient.getChildren(path);
    
        }
    }
    
    public class ZkClientWatcher {
        private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    
        ZkClient zkClient;
    
        public ZkClientWatcher() {
            this.zkClient = new ZkClient(CONNECT_ADDR, 5000, 5000, new SerializableSerializer());
        }
    
    
        public void createPersistent(String path, Object data) {
            zkClient.createPersistent(path, data);
        }
    
    
        public void writeData(String path, Object object) {
            zkClient.writeData(path, object);
    
        }
    
        public void delete(String path) {
            zkClient.delete(path);
    
        }
    
        public boolean exists(String path) {
            return zkClient.exists(path);
    
        }
    
        public void deleteRecursive(String path) {
            zkClient.deleteRecursive(path);
    
        }
    
        //对父节点添加监听数据变化。
        public void subscribe(String path) {
    
    
            zkClient.subscribeDataChanges(path, new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data);
                }
    
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.printf("删除的节点为:%s\r\n", dataPath);
                }
            });
        }
    
        //对父节点添加监听子节点变化。
        public void subscribe2(String path) {
            zkClient.subscribeChildChanges(path, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n");
                }
            });
        }
    
    
        //客户端状态
        public void subscribe3(String path) {
            zkClient.subscribeStateChanges(new IZkStateListener() {
                @Override
                public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                    if (state == Watcher.Event.KeeperState.SyncConnected) {
                        //当我重新启动后start,监听触发
                        System.out.println("连接成功");
                    } else if (state == Watcher.Event.KeeperState.Disconnected) {
                        System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
                    } else {
                        System.out.println("其他状态" + state);
                    }
                }
    
                @Override
                public void handleNewSession() throws Exception {
                    System.out.println("重建session");
    
                }
    
                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
    
                }
            });
    

    3.Curator

    import com.google.common.base.Charsets;
    import com.google.common.base.Objects;
    import com.google.common.base.Strings;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.CuratorWatcher;
    import org.apache.curator.framework.api.GetChildrenBuilder;
    import org.apache.curator.framework.api.GetDataBuilder;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.utils.ZKPaths;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * @date: 2020/2/23 15:25
     * @author: xionghu
     * @desc:
     */
    public  class CuratorUtil {
        private CuratorFramework client;
    
    
        public CuratorUtil(String zkAddress) {
            client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new NodeEventListener());
            client.start();
        }
    
    
        /**
         * 创建node
         *
         * @param nodeName
         * @param value
         * @return
         */
        public boolean createNode(String nodeName, String value) {
            boolean suc = false;
            try {
                Stat stat = getClient().checkExists().forPath(nodeName);
                if (stat == null) {
                    String opResult = null;
                    if (Strings.isNullOrEmpty(value)) {
                        opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                    }
                    else {
                        opResult =
                                getClient().create().creatingParentsIfNeeded()
                                        .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                    }
                    suc = Objects.equal(nodeName, opResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return suc;
        }
    
    
        /**
         * 更新节点
         *
         * @param nodeName
         * @param value
         * @return
         */
        public boolean updateNode(String nodeName, String value) {
            boolean suc = false;
            try {
                Stat stat = getClient().checkExists().forPath(nodeName);
                if (stat != null) {
                    Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                    suc = opResult != null;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return suc;
        }
    
    
        /**
         * 删除节点
         *
         * @param nodeName
         */
        public void deleteNode(String nodeName) {
            try {
                getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 找到指定节点下所有子节点的名称与值
         *
         * @param node
         * @return
         */
        public Map<String, String> listChildrenDetail(String node) {
            Map<String, String> map = Maps.newHashMap();
            try {
                GetChildrenBuilder childrenBuilder = getClient().getChildren();
                List<String> children = childrenBuilder.forPath(node);
                GetDataBuilder dataBuilder = getClient().getData();
                if (children != null) {
                    for (String child : children) {
                        String propPath = ZKPaths.makePath(node, child);
                        map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return map;
        }
    
    
        /**
         * 列出子节点的名称
         *
         * @param node
         * @return
         */
        public List<String> listChildren(String node) {
            List<String> children = Lists.newArrayList();
            try {
                GetChildrenBuilder childrenBuilder = getClient().getChildren();
                children = childrenBuilder.forPath(node);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            return children;
        }
    
    
        /**
         * 增加监听
         *
         * @param node
         * @param isSelf
         *            true 为node本身增加监听 false 为node的子节点增加监听
         * @throws Exception
         */
        public void addWatch(String node, boolean isSelf) throws Exception {
            if (isSelf) {
                getClient().getData().watched().forPath(node);
            }
            else {
                getClient().getChildren().watched().forPath(node);
            }
        }
    
    
        /**
         * 增加监听
         *
         * @param node
         * @param isSelf
         *            true 为node本身增加监听 false 为node的子节点增加监听
         * @param watcher
         * @throws Exception
         */
        public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
            if (isSelf) {
                getClient().getData().usingWatcher(watcher).forPath(node);
            }
            else {
                getClient().getChildren().usingWatcher(watcher).forPath(node);
            }
        }
    
    
        /**
         * 增加监听
         *
         * @param node
         * @param isSelf
         *            true 为node本身增加监听 false 为node的子节点增加监听
         * @param watcher
         * @throws Exception
         */
        public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
            if (isSelf) {
                getClient().getData().usingWatcher(watcher).forPath(node);
            }
            else {
                getClient().getChildren().usingWatcher(watcher).forPath(node);
            }
        }
    
    
        /**
         * 销毁资源
         */
        public void destory() {
            if (client != null) {
                client.close();
            }
        }
    
    
        /**
         * 获取client
         *
         * @return
         */
        public CuratorFramework getClient() {
            return client;
        }
    
    }
    
    final class NodeEventListener implements CuratorListener {
        @Override
        public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
            System.out.println(event.toString() + ".......................");
            final WatchedEvent watchedEvent = event.getWatchedEvent();
            if (watchedEvent != null) {
                System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    switch (watchedEvent.getType()) {
                        case NodeChildrenChanged:
    
                            break;
                        case NodeDataChanged:
                            // TODO
                            break;
                        default:
                            break;
                    }
                }
            }
        }
    }
    
    public class CuratorTest {
        private static final String CONNECT_ADDR = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    
    
    
        public static void main(String[] args) throws Exception {
            CuratorUtil curator = new CuratorUtil(CONNECT_ADDR);
            curator.createNode("/root/test1", "abc1");
            curator.createNode("/root/test2", "abc2");
            curator.updateNode("/root/test2", "abc3");
            List<String> list = curator.listChildren("/root");
            Map<String, String> map = curator.listChildrenDetail("/root");
            // curator.deleteNode("/zkroot");
            // curator.destory();
            System.out.println("=========================================");
            for (String str : list) {
                System.out.println(str);
            }
    
            System.out.println("=========================================");
            for (Map.Entry<String, String> entry : map.entrySet()) {
                System.out.println(entry.getKey() + "=>" + entry.getValue());
            }
    
            // 增加监听
            curator.addWatch("/root", false);
    
            TimeUnit.SECONDS.sleep(600);
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:zookeeper 客户端使用

          本文链接:https://www.haomeiwen.com/subject/gqmxqhtx.html