美文网首页
Curator基础使用

Curator基础使用

作者: 农民工进城 | 来源:发表于2020-09-27 15:12 被阅读0次

    一 、简介

    Apache Curator是一个比较完善的,由Netflix公司开源的一套ZooKeeper的JAVA客户端框架组件。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

    二、基本API

    • ZK的节点类型有:PERSISTENT(持久化节点),PERSISTENT_SEQUENTIAL(顺序持久化节点),EPHEMERAL(临时节点),EPHEMERAL_SEQUENTIAL(临时顺序节点)。
    • 为了便于观察本文以PERSISTENT为例。
    • pom依赖
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>5.1.0</version>
            </dependency>
    
    2.1 初始化客户端
    curatorFramework = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                    .connectString(ipPort) // 配置zk服务器IP port
                    .sessionTimeoutMs(4000)// 设定会话时间
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))// 设置及重连策略
                    .namespace("curator")// 方便管理的命名空间,其实就是一级目录
                    .build();// 建立管道
            curatorFramework.start();//启动
    
    2.2 创建节点
        public static String createZnode(String path, String value) throws Exception {
            return curatorFramework.create()// 创建Znode
                    .creatingParentsIfNeeded()// 如果是多级结点,这里声明如果需要,自动创建父节点
                    .withMode(CreateMode.PERSISTENT)// 声明结点类型
                    .forPath(path, value.getBytes());// 声明结点路径和值
        }
    
    2.3 删除节点
    public static void deleteZnode(String path) throws Exception {
            curatorFramework.delete().deletingChildrenIfNeeded()// 如果有子节点,会先自动删除子节点再删除本结点
                    .forPath(path);
        }
    
    
    2.4 节点是否存在
    public static boolean  checkExists(String path) throws Exception {
            Stat stat = curatorFramework.checkExists().forPath(path);
            return stat!=null;
        }
    
    2.5 修改节点
    
        public static void updateZnode(String path, String value) throws Exception {
            Stat stat = curatorFramework.checkExists().forPath(path);
            if (stat == null) {
                System.out.println("Znode does not exists");
            } else {
                curatorFramework.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());
            }
        }
    
    
    2.6 查询节点
    
        public static String getZnodeData(String path) throws Exception {
            byte[] dataBytes = curatorFramework.getData().forPath(path);
            return new String(dataBytes);
        }
    
    2.7 查询子节点
        public static List<String> getnodeChildren(String path) throws Exception {
            List<String> dataList = curatorFramework.getChildren().forPath(path);
            return dataList;
        }
    

    三、高级API

    curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

    • pom依赖
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>5.1.0</version>
            </dependency>
    
    3.1 CacheListener监听器

    CacheListener用来监控ZNode的节点. 当节点进行增,删,改时,会触发响应事件。主要有:NodeCacheListener,PathChildrenCacheListener,TreeCacheListener三类监听器

    NodeCacheListener:只是监听节点是否有变化,无法知道是什么事件
    PathChildrenCacheListener:监听子节点的增删改事件
    TreeCacheListener:监听增删改事件,包括自己跟子节点

    • 拿PathChildrenCacheListener举例
            public static void addWatcherWithChildrenCache(String path) throws Exception {
    
            CuratorCache curatorCache = CuratorCache.builder(curatorFramework, path).build();
            // 缓存数据
            PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
                        throws Exception {
                    System.out.println("事件路径:" + pathChildrenCacheEvent.getData().getPath() + "事件类型"
                            + pathChildrenCacheEvent.getType());
                }
            };
            CuratorCacheListener listener = CuratorCacheListener.builder()
                    .forPathChildrenCache(path, curatorFramework, pathChildrenCacheListener).build();
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
        }
    
    3.2 分布式锁

    Curator的分布式锁有四个核心类,分别是:InterProcessMutex、InterProcessSemaphoreMutex、InterProcessReadWriteLock、InterProcessMultiLock。其作用:
    InterProcessMutex:分布式可重入排它锁
    InterProcessSemaphoreMutex:分布式排它锁
    InterProcessReadWriteLock:分布式读写锁
    InterProcessMultiLock:将多个锁作为单个实体管理的容器

    • API操作
       public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
       public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
       public void release() throws Exception;//释放锁
    

    操作简单不再举例

    3.3 分布式计数器

    分布式计数器有三个核心类:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。

    increment() //加1
    decrement() //减1
    compareAndSet(Integer expectedValue, Integer newValue) //cas操作
    get() //获取当前值
    

    操作简单不再举例

    参考:https://github.com/apache/curator/tree/master/curator-examples/src/main/java

    相关文章

      网友评论

          本文标题:Curator基础使用

          本文链接:https://www.haomeiwen.com/subject/nsvkuktx.html