美文网首页
4. zk客户端

4. zk客户端

作者: 指尖架构141319 | 来源:发表于2019-10-29 14:50 被阅读0次

1. 引入依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>
    <!--zkclient-->
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>
    <!--curator-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>

2. 客户端

2.1 原生
  • 获取session两种方式
public class TestCreateSession {
    //ZooKeeper服务地址
    private static final String SERVER = "172.16.223.132:2181";

    //会话超时时间
    private final int SESSION_TIMEOUT = 30000;

    @Test
    /**
     * 获得session的方式,这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了
     */
    public  void testSession1() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER,SESSION_TIMEOUT,null);
        System.out.println(zooKeeper);
        System.out.println(zooKeeper.getState());
        zooKeeper.create("/fk","fkvalue".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
//结果:
//State:CONNECTING sessionid:0x0 local:null remoteserver:null lastZxid:0 xid:1 sent:0 recv:0 queuedpkts:0 pendingresp:0 queuedevents:0
//CONNECTING   
//这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了


    //发令枪
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Test
    /**
     * 对获得Session的方式进行优化,在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作
     */
    public  void testSession2() throws  Exception {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    //确认已经连接完毕后再进行操作
                    countDownLatch.countDown();
                    System.out.println("已经获得了连接");
                }
            }
        });

        //连接完成之前先等待
        countDownLatch.await();
        System.out.println(zooKeeper.getState());
    }

//结果:
//已经获得了连接
//CONNECTED
//在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作
  • 原生增删改查
public class TestJavaApi implements Watcher {

    private static final int SESSION_TIMEOUT = 10000;
    private static final String CONNECTION_STRING = "172.16.223.132:2181";
    private static final String ZK_PATH = "/leader";
    private ZooKeeper zk = null;

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 创建ZK连接
     *
     * @param connectString  ZK服务器地址列表
     * @param sessionTimeout Session超时时间
     */
    public void createConnection(String connectString, int sessionTimeout) {
        this.releaseConnection();
        try {
            zk = new ZooKeeper(connectString, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (InterruptedException e) {
            System.out.println("连接创建失败,发生 InterruptedException");
            e.printStackTrace();
        } catch (IOException e) {
            System.out.println("连接创建失败,发生 IOException");
            e.printStackTrace();
        }
    }

    /**
     * 关闭ZK连接
     */
    public void releaseConnection() {
        if (null != this.zk) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                // ignore
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     *
     * @param path 节点path
     * @param data 初始数据内容
     * @return
     */
    public boolean createPath(String path, String data) {
        try {
            System.out.println("节点创建成功, Path: "
                    + this.zk.create(path, // 节点路径
                    data.getBytes(), // 节点内容
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限
                    CreateMode.PERSISTENT) //节点类型
                    + ", content: " + data);
        } catch (KeeperException e) {
            System.out.println("节点创建失败,发生KeeperException");
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("节点创建失败,发生 InterruptedException");
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 读取指定节点数据内容
     *
     * @param path 节点path
     * @return
     */
    public String readData(String path) {
        try {
            System.out.println("获取数据成功,path:" + path);
            return new String(this.zk.getData(path, false, null));
        } catch (KeeperException e) {
            System.out.println("读取数据失败,发生KeeperException,path: " + path);
            e.printStackTrace();
            return "";
        } catch (InterruptedException e) {
            System.out.println("读取数据失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定节点数据内容
     *
     * @param path 节点path
     * @param data 数据内容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新数据成功,path:" + path + ", stat: " +
                    this.zk.setData(path, data.getBytes(), -1));
        } catch (KeeperException e) {
            System.out.println("更新数据失败,发生KeeperException,path: " + path);
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("更新数据失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 删除指定节点
     *
     * @param path 节点path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("删除节点成功,path:" + path);
        } catch (KeeperException e) {
            System.out.println("删除节点失败,发生KeeperException,path: " + path);
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("删除节点失败,发生 InterruptedException,path: " + path);
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        TestJavaApi sample = new TestJavaApi();

        sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
        if (sample.createPath(ZK_PATH, "我是节点初始内容")) {
            System.out.println();
            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
            sample.writeData(ZK_PATH, "更新后的数据");
            System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");
            sample.deleteNode(ZK_PATH);
        }

        sample.releaseConnection();
    }

    /**
     * 收到来自Server的Watcher通知后的处理。
     */
    @Override
    public void process(WatchedEvent event) {
        System.out.println("收到事件通知:" + event.getState() + "\n");
        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }

    }

}
//结果:
//收到事件通知:SyncConnected
//节点创建成功, Path: /leader, content: 我是节点初始内容
//获取数据成功,path:/leader
//数据内容: 我是节点初始内容
//更新数据成功,path:/leader, stat: 8589934623,8589934624,1572320047950,1572320047969,1,0,0,0,18,0,8589934623
//获取数据成功,path:/leader
//数据内容: 更新后的数据
//删除节点成功,path:/leader
  • 监控watch
    zookeeper的监控都是一次性的所以 每次必须设置监控
    watch的process方法中,对不同事件进行处理
public class ZooKeeperWatcher implements Watcher  {

    /** 定义原子变量 */
    AtomicInteger seq = new AtomicInteger();
    /** 定义session失效时间 */
    private static final int SESSION_TIMEOUT = 10000;
    /** zookeeper服务器地址 */
    private static final String CONNECTION_ADDR = "172.16.223.132:2181";
    /** zk父路径设置 */
    private static final String PARENT_PATH = "/testWatch";
    /** zk子路径设置 */
    private static final String CHILDREN_PATH = "/testWatch/children";
    /** 进入标识 */
    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
    /** zk变量 */
    private ZooKeeper zk = null;
    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 创建ZK连接
     * @param connectAddr ZK服务器地址列表
     * @param sessionTimeout Session超时时间
     */
    public void createConnection(String connectAddr, int sessionTimeout) {
        this.releaseConnection();
        try {
            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
            connectedSemaphore.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭ZK连接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     * @param path 节点路径
     * @param data 数据内容
     * @return
     */
    public boolean createPath(String path, String data) {
        try {
            //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
            this.zk.exists(path, true);
            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
                    this.zk.create( /**路径*/
                            path,
                            /**数据*/
                            data.getBytes(),
                            /**所有可见*/
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            /**永久存储*/
                            CreateMode.PERSISTENT ) +
                    ", content: " + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 读取指定节点数据内容
     * @param path 节点路径
     * @return
     */
    public String readData(String path, boolean needWatch) {
        try {
            return new String(this.zk.getData(path, needWatch, null));
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定节点数据内容
     * @param path 节点路径
     * @param data 数据内容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
                    this.zk.setData(path, data.getBytes(), -1));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 删除指定节点
     *
     * @param path
     *            节点path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 判断指定节点是否存在
     * @param path 节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取子节点
     * @param path 节点路径
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 删除所有节点
     */
    public void deleteAllTestPath() {
        if(this.exists(CHILDREN_PATH, false) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, false) != null){
            this.deleteNode(PARENT_PATH);
        }
    }

    /**
     * 收到来自Server的Watcher通知后的处理。
     */
    @Override
    public void process(WatchedEvent event) {

        System.out.println("进入 process 。。。。。event = " + event);

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (event == null) {
            return;
        }

        // 连接状态
        Watcher.Event.KeeperState keeperState = event.getState();
        // 事件类型
        Watcher.Event.EventType eventType = event.getType();
        // 受影响的path
        String path = event.getPath();

        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

        System.out.println(logPrefix + "收到Watcher通知");
        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

        if (Event.KeeperState.SyncConnected == keeperState) {
            // 成功连接上ZK服务器
            if (Event.EventType.None == eventType) {
                System.out.println(logPrefix + "成功连接上ZK服务器");
                connectedSemaphore.countDown();
            }
            //创建节点
             if (Event.EventType.NodeCreated == eventType) {
                System.out.println(logPrefix + "节点创建");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.exists(path, true);
            }
            //更新节点
            else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(logPrefix + "节点数据更新");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
            }
            //更新子节点
            else if (Event.EventType.NodeChildrenChanged == eventType) {
                System.out.println(logPrefix + "子节点变更");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
            }
            //删除节点
            else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(logPrefix + "节点 " + path + " 被删除");
            }
        }
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
            System.out.println(logPrefix + "与ZK服务器断开连接");
        }
        else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
            System.out.println(logPrefix + "权限检查失败");
        }
        else if (Watcher.Event.KeeperState.Expired == keeperState) {
            System.out.println(logPrefix + "会话失效");
        }

        System.out.println("--------------------------------------------");

    }


    public static void main(String[] args) throws Exception {

        //建立watcher
        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
        //创建连接
        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
        //System.out.println(zkWatch.zk.toString());

        Thread.sleep(1000);

        // 清理节点
        zkWatch.deleteAllTestPath();



        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {



            // 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的,
            // 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。
             System.out.println("---------------------- read parent ----------------------------");

          //  zkWatch.readData(PARENT_PATH, true);
            // 更新数据
           zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");


            /** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,
             也就是说创建子节点时没有watch。
             如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,
             而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
             其中path="/p/c1"
             */
            System.out.println("---------------------- read children path ----------------------------");
            zkWatch.getChildren(PARENT_PATH, true);


           Thread.sleep(1000);

            // 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            zkWatch.readData(CHILDREN_PATH, true);
            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }

        Thread.sleep(20000);
        // 清理节点
        zkWatch.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatch.releaseConnection();
    }
}
//结果:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:    SyncConnected
【Watcher-1】事件类型:    None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch
【Main】节点创建成功, Path: /testWatch, content: 1572329669666
---------------------- read parent ----------------------------
【Main】更新数据成功,path:/testWatch, stat: 8589934628,8589934629,1572329669677,1572329669687,1,0,0,0,13,0,8589934628

---------------------- read children path ----------------------------
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:    SyncConnected
【Watcher-2】事件类型:    NodeCreated
【Watcher-2】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch/children
【Main】节点创建成功, Path: /testWatch/children, content: 1572329670705
【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:    SyncConnected
【Watcher-3】事件类型:    NodeCreated
【Watcher-3】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态:    SyncConnected
【Watcher-4】事件类型:    NodeChildrenChanged
【Watcher-4】子节点变更
【Main】更新数据成功,path:/testWatch/children, stat: 8589934630,8589934631,1572329670708,1572329671722,1,0,0,0,13,0,8589934630

【Watcher-4】子节点列表:[children]
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatch/children
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态:    SyncConnected
【Watcher-5】事件类型:    NodeDataChanged
【Watcher-5】节点数据更新
【Watcher-5】数据内容: 1572329669683
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Main】删除节点成功,path:/testWatch/children
【Main】删除节点成功,path:/testWatch
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态:    SyncConnected
【Watcher-6】事件类型:    NodeChildrenChanged
【Watcher-6】子节点变更

原生客户端弊端:
1.会话连接是异步的
2.watch机制需要重复注册
3.session重连机制
4.开发复杂度高

2.2 zkClient
  • 基本操作
public class ZkClientOperator {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "172.16.223.132:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 10000;//ms

    public static void main(String[] args) throws Exception {
       //ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
        ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);

        //1. create and delete方法
        zkc.createEphemeral("/temp");
        zkc.createPersistent("/super/c1", true);
        Thread.sleep(10000);
        zkc.delete("/temp");
        zkc.deleteRecursive("/super");

        //2. 设置path和data 并且读取子节点和每个节点的内容
        zkc.createPersistent("/super", "1234");
        zkc.createPersistent("/super/c1", "c1内容");
        zkc.createPersistent("/super/c2", "c2内容");
        List<String> list = zkc.getChildren("/super");
        for(String p : list){
            System.out.println(p);
            String rp = "/super/" + p;
            String data = zkc.readData(rp);
            System.out.println("节点为:" + rp + ",内容为: " + data);
        }

        //3. 更新和判断节点是否存在
        zkc.writeData("/super/c1", "新内容");
        System.out.println(zkc.readData("/super/c1").toString());
        System.out.println(zkc.exists("/super/c1"));

//      4.递归删除/super内容
        zkc.deleteRecursive("/super");
    }
  • watch
public class TestZkClientWatcher {


    /** zookeeper地址 */
    static final String CONNECT_ADDR = "172.16.223.132:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 10000;//ms

    @Test
    /**
     * subscribeChildChanges方法 订阅子节点变化
     */
    public  void testZkClientWatcher1() throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

        //对父节点添加监听子节点变化。
        zkc.subscribeChildChanges("/super", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentPath: " + parentPath);
                System.out.println("currentChilds: " + currentChilds);
            }
        });

        Thread.sleep(3000);
        zkc.createPersistent("/super");
        Thread.sleep(1000);
        zkc.createPersistent("/super" + "/" + "c1", "c1内容");
        Thread.sleep(1000);
        zkc.createPersistent("/super" + "/" + "c2", "c2内容");
        Thread.sleep(1000);
        zkc.delete("/super/c2");
        Thread.sleep(1000);
        zkc.deleteRecursive("/super");
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Test
    /**
     * subscribeDataChanges 订阅内容变化
     */
    public void testZkClientWatcher2() throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
        zkc.createPersistent("/super", "1234");
        //对父节点添加监听子节点变化。
        zkc.subscribeDataChanges("/super", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String path) throws Exception {
                System.out.println("删除的节点为:" + path);
            }
            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
            }
        });

        Thread.sleep(3000);
        zkc.writeData("/super", "456", -1);
        Thread.sleep(1000);
        zkc.delete("/super");
        Thread.sleep(Integer.MAX_VALUE);
    }
}
2.3 curator
  • 基本操作
public class OperatorTest {
    //ZooKeeper服务地址
    private static final String SERVER = "172.16.223.132:2181";
    //会话超时时间
    private final int SESSION_TIMEOUT = 30000;
    //连接超时时间
    private final int CONNECTION_TIMEOUT = 5000;
    //创建连接实例
    private CuratorFramework client = null;

    /**
     * baseSleepTimeMs:初始的重试等待时间
     * maxRetries:最多重试次数
     *
     *
     * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增
     * RetryNTimes:重试N次
     * RetryOneTime:重试一次
     * RetryUntilElapsed:重试一定时间
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

@org.junit.Before
    public void init(){
        //创建 CuratorFrameworkImpl实例
        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
        //启动
        client.start();
    }

    /**
     * 测试创建节点
     * @throws Exception
     */
    @Test
    public void testCreate() throws Exception{
        //创建永久节点
        client.create().forPath("/curator","/curator data".getBytes());
        //创建永久有序节点
      client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());
        //创建临时节点
        client.create().withMode(CreateMode.EPHEMERAL)
                .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());
        //创建临时有序节点
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());

    }

    /**
     * 测试检查某个节点是否存在
     * @throws Exception
     */
    @Test
    public void testCheck() throws Exception{
        Stat stat1 = client.checkExists().forPath("/curator");
        Stat stat2 = client.checkExists().forPath("/curator2");
        System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false));
        System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));
    }

    /**
     * 测试异步设置节点数据
     * @throws Exception
     */
    @Test
    public void testSetDataAsync() throws Exception{
        //创建监听器
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event)
                    throws Exception {
                System.out.println(event.getPath());
            }
        };

        //添加监听器
        client.getCuratorListenable().addListener(listener);
        //异步设置某个节点数据
        client.setData().inBackground().forPath("/curator","sync".getBytes());
        //为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒
        Thread.sleep(10000);
    }

    /**
     * 测试另一种异步执行获取通知的方式
     * @throws Exception
     */
    @Test
    public void testSetDataAsyncWithCallback() throws Exception{
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event)
                    throws Exception {
                System.out.println(event.getPath());
            }
        };
        //异步设置某个节点数据
        client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());
        //为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒
        Thread.sleep(10000);
    }

    /**
     * 测试删除节点
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception{
        //创建测试节点
        client.create().orSetData().creatingParentsIfNeeded()
                .forPath("/curator/del_key1","/curator/del_key1 data".getBytes());
        client.create().orSetData().creatingParentsIfNeeded()
                .forPath("/curator/del_key2","/curator/del_key2 data".getBytes());
        client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());

        //删除该节点
        client.delete().forPath("/curator/del_key1");
        //级联删除子节点
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");
    }


    /*
     * 测试事务管理:碰到异常,事务会回滚
     * @throws Exception
     */
    @Test
    public void testTransaction() throws Exception{
        //定义几个基本操作
        CuratorOp createOp = client.transactionOp().create()
                .forPath("/curator/one_path","some data".getBytes());

        CuratorOp setDataOp = client.transactionOp().setData()
                .forPath("/curator","other data".getBytes());

        CuratorOp deleteOp = client.transactionOp().delete()
                .forPath("/curator");

        //事务执行结果
        List<CuratorTransactionResult> results = client.transaction()
                .forOperations(createOp,setDataOp,deleteOp);

        //遍历输出结果
        for(CuratorTransactionResult result : results){
            System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
        }
    }
}

相关文章

网友评论

      本文标题:4. zk客户端

      本文链接:https://www.haomeiwen.com/subject/coqbvctx.html