美文网首页ZooKeeper新手
Zookeeper入门之三-Java客户端curator的使用

Zookeeper入门之三-Java客户端curator的使用

作者: AlanKim | 来源:发表于2019-02-05 00:32 被阅读56次

    ZK的java客户端—curator 基本使用

    普通的增删改查实现--同步接口
     public class CuratorConTest {
    
        static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重试3次
    
        // 创建连接 -- 传统写法
    /*        CuratorFramework zkClient = CuratorFrameworkFactory.newClient("localhost:32770",
                    5000,
                    3000,
                    policy);*/
    
        // 创建连接-- 流式写法
        static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
                .connectString("localhost:32770")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(policy)
                .namespace("zk-jsy")  // 如果指定了某个应用只能在某一个节点下操作,
                // 可以指定namespace,这里base表示路径为/base。记得不能直接用/,会报错。
                .build();
    
    
        public static void main(String[] args) throws Exception {
    
            // 连接开启
    //        zkClient.start();
    
            zkFluentClient.start();
    
            // 测试创建
            CuratorConTest test = new CuratorConTest();
            test.testCreate();
    
            // 测试获取
            test.testGet();
    
            // 测试更新
            test.testUpdate();
    
            // 测试删除
            test.testDelete();
    
    //        Thread.sleep(Integer.MAX_VALUE);
        }
    
        private void testCreate() throws Exception {
            // 1、创建默认类型节点,书上说默认内容为空,但是实际上上我本地的ip地址
            //org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /zk-jsy/book
            // 是因为zookeeper的版本和curator的版本不兼容导致的,默认zk的版本是3.5.1-Alpha,降级成3.4.8即可
            zkFluentClient.create().forPath("/book1-" + ThreadLocalRandom.current().nextInt());
    
            // 2、创建有默认值的节点
            zkFluentClient.create().forPath("/book2-" + ThreadLocalRandom.current().nextFloat(), "mytestbook2Create".getBytes());
    
            // 3、创建临时节点,断开后会自动清除
            zkFluentClient.create().withMode(CreateMode.EPHEMERAL).
                forPath("/book3-" + ThreadLocalRandom.current().nextInt());
    
            // 4、创建临时节点,同时如果父节点不存在,也把父节点创建了。但是父节点会是持久节点
            zkFluentClient.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL).
                forPath("/test/book4-test" + ThreadLocalRandom.current().nextInt());
        }
    
        private void testGet() throws Exception {
            String path = "/getData/mydata-" + ThreadLocalRandom.current().nextInt();
            zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                    ("sogetdata" + ThreadLocalRandom.current().nextInt()).getBytes());
    
            // 1、获取,注意返回的是bytes
            String value = new String(zkFluentClient.getData().forPath(path));
            System.out.println(value);
    
            // 2、获取属性
            Stat stat = new Stat();
            String value11 = new String(zkFluentClient.getData().storingStatIn(stat).forPath(path));
            System.out.println(stat.toString());
            System.out.println(value11);
        }
    
        private void testUpdate() throws Exception {
            String path = "/updateData/mydata-" + ThreadLocalRandom.current().nextInt();
            zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                    ("toBeUpdate" + ThreadLocalRandom.current().nextInt()).getBytes());
            System.out.println("originData:" + new String(zkFluentClient.getData().forPath(path)));
    
            // 1、 普通的update,不管version
            Stat stat = zkFluentClient.setData().forPath(path, ("newData" + ThreadLocalRandom.current().nextInt()).getBytes());
            System.out.println("newData:" + new String(zkFluentClient.getData().forPath(path)));
    
            // 2、乐观锁更新,可以用来实现CAS,如果version不匹配,是无法更新的
            zkFluentClient.setData().withVersion(stat.getVersion()).forPath(path, ("UpdateByVersion:" + stat.getVersion()).getBytes());
            System.out.println("updateByVersionData:" + new String(zkFluentClient.getData().forPath(path)));
    
            // 2.1 测试cas,传入version=1,而当前实际为2
            zkFluentClient.setData().withVersion(1).forPath(path,"error".getBytes());
            System.out.println("updateByErrorVersionData:" + new String(zkFluentClient.getData().forPath(path)));
            // KeeperErrorCode = BadVersion for /zk-jsy/updateData/mydata--1431282676 返回这个异常,version不对
        }
    
        private void testDelete() throws Exception {
    
            // 1、普通删除,但是不能删除含有叶子节点的父节点
            String path = "/book/forDelete" + ThreadLocalRandom.current().nextInt();
            zkFluentClient.create().creatingParentsIfNeeded().forPath(path);
            //Thread.sleep(20000);  // sleep期间可以看到对应的节点
            zkFluentClient.delete().forPath(path);  // 删除节点
    
            // 2、删除节点,以及递归删除其子节点,如果传入/,删除的是 namespace下的根目录
            zkFluentClient.delete().deletingChildrenIfNeeded().forPath("/book");
    
            // 3、无论如何,只要客户端连接存在,就会一直重试,直到删除成功,避免因为集群选主等情况造成数据无法清除
            zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
        }
    }
    
    创建的异步实现
    /**
     * 异步实现
     */
    public class AsyncCuratorTest {
    
        static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重试3次
    
        // 创建连接-- 流式写法
        static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
                .connectString("localhost:32770")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(policy)
                .namespace("zk-asyncjsy")  // 如果指定了某个应用只能在某一个节点下操作,
                // 可以指定namespace,这里base表示路径为/base。记得不能直接用/,会报错。
                .build();
    
        static CountDownLatch countDownLatch = new CountDownLatch(2); // countDownLatch
    
        static ExecutorService tp = Executors.newFixedThreadPool(2); // ThreadPool
    
        public static void main(String[] args) throws Exception {
    
            String path = "/asyncCreate" + ThreadLocalRandom.current().nextInt();
    
            zkFluentClient.start();
    
            // 异步创建 -1
            zkFluentClient.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
    
                            System.out.println("Thread of processResult:" + Thread.currentThread().getName());
    
                            countDownLatch.countDown();
                        }
                    }, tp).forPath(path, "createInfo".getBytes());
    
            // 异步创建 -2 ,会重复,返回错误码
            zkFluentClient.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
    
                            System.out.println("Thread of processResult:" + Thread.currentThread().getName());
    
                            countDownLatch.countDown();
                        }
                    }).forPath(path, "createAgain".getBytes());
    
            countDownLatch.await();
            tp.shutdown();
        }
    }
    

    执行结果如下:

    Event[code:0, type:CREATE
    Thread of processResult:pool-3-thread-1
    Event[code:-110, type:CREATE
    Thread of processResult:main-EventThread

    注意以下方面:

    1. 第一次异步回调传入了 线程池tp,用于在线程池中执行对应的回调,可以从结果中看到,执行的线程并不是Main线程
    2. 第二次异步回调,并没有传入线程池tp,所以执行操作的是主线程Main
    3. countDownLatch在这里只是为了保证线程执行结束后,可以shutdown线程池
    4. 返回的event.getResultCode如果是0,表示操作成功,如果是其他值,表示不成功,比如-110,表示数据节点已存在。

    相关文章

      网友评论

        本文标题:Zookeeper入门之三-Java客户端curator的使用

        本文链接:https://www.haomeiwen.com/subject/wvkyjqtx.html