美文网首页
Curator基础使用

Curator基础使用

作者: 农民工进城 | 来源:发表于2020-09-27 15:12 被阅读0次

一 、简介

Apache Curator是一个比较完善的,由Netflix公司开源的一套ZooKeeper的JAVA客户端框架组件。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

二、基本API

  • ZK的节点类型有:PERSISTENT(持久化节点),PERSISTENT_SEQUENTIAL(顺序持久化节点),EPHEMERAL(临时节点),EPHEMERAL_SEQUENTIAL(临时顺序节点)。
  • 为了便于观察本文以PERSISTENT为例。
  • pom依赖
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.1.0</version>
        </dependency>
2.1 初始化客户端
curatorFramework = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                .connectString(ipPort) // 配置zk服务器IP port
                .sessionTimeoutMs(4000)// 设定会话时间
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))// 设置及重连策略
                .namespace("curator")// 方便管理的命名空间,其实就是一级目录
                .build();// 建立管道
        curatorFramework.start();//启动
2.2 创建节点
    public static String createZnode(String path, String value) throws Exception {
        return curatorFramework.create()// 创建Znode
                .creatingParentsIfNeeded()// 如果是多级结点,这里声明如果需要,自动创建父节点
                .withMode(CreateMode.PERSISTENT)// 声明结点类型
                .forPath(path, value.getBytes());// 声明结点路径和值
    }
2.3 删除节点
public static void deleteZnode(String path) throws Exception {
        curatorFramework.delete().deletingChildrenIfNeeded()// 如果有子节点,会先自动删除子节点再删除本结点
                .forPath(path);
    }

2.4 节点是否存在
public static boolean  checkExists(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        return stat!=null;
    }
2.5 修改节点

    public static void updateZnode(String path, String value) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        if (stat == null) {
            System.out.println("Znode does not exists");
        } else {
            curatorFramework.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());
        }
    }

2.6 查询节点

    public static String getZnodeData(String path) throws Exception {
        byte[] dataBytes = curatorFramework.getData().forPath(path);
        return new String(dataBytes);
    }
2.7 查询子节点
    public static List<String> getnodeChildren(String path) throws Exception {
        List<String> dataList = curatorFramework.getChildren().forPath(path);
        return dataList;
    }

三、高级API

curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

  • pom依赖
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.1.0</version>
        </dependency>
3.1 CacheListener监听器

CacheListener用来监控ZNode的节点. 当节点进行增,删,改时,会触发响应事件。主要有:NodeCacheListener,PathChildrenCacheListener,TreeCacheListener三类监听器

NodeCacheListener:只是监听节点是否有变化,无法知道是什么事件
PathChildrenCacheListener:监听子节点的增删改事件
TreeCacheListener:监听增删改事件,包括自己跟子节点

  • 拿PathChildrenCacheListener举例
        public static void addWatcherWithChildrenCache(String path) throws Exception {

        CuratorCache curatorCache = CuratorCache.builder(curatorFramework, path).build();
        // 缓存数据
        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
                    throws Exception {
                System.out.println("事件路径:" + pathChildrenCacheEvent.getData().getPath() + "事件类型"
                        + pathChildrenCacheEvent.getType());
            }
        };
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forPathChildrenCache(path, curatorFramework, pathChildrenCacheListener).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
    }
3.2 分布式锁

Curator的分布式锁有四个核心类,分别是:InterProcessMutex、InterProcessSemaphoreMutex、InterProcessReadWriteLock、InterProcessMultiLock。其作用:
InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器

  • API操作
   public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
   public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
   public void release() throws Exception;//释放锁

操作简单不再举例

3.3 分布式计数器

分布式计数器有三个核心类:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。

increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值

操作简单不再举例

参考:https://github.com/apache/curator/tree/master/curator-examples/src/main/java

相关文章

网友评论

      本文标题:Curator基础使用

      本文链接:https://www.haomeiwen.com/subject/nsvkuktx.html