找了一圈没找到官方的使用手册- -!,自己瞎练练了
常用的zookeeper java客户端:
- zookeeper原生Java API
- zkclient
- Apache curator
ZooKeeper原生Java API的不足之处:
- 在连接zk超时的时候,不支持自动重连,需要手动操作
- Watch注册一次就会失效,需要反复注册
- 不支持递归创建节点
Apache curator介绍:
Apache 的开源项目
- 解决Watch注册一次就会失效的问题
- 支持直接创建多级结点
- 提供的 API 更加简单易用
- 提供更多解决方案并且实现简单,例如:分布式锁
- 提供常用的ZooKeeper工具类
- 编程风格更舒服
pom
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
一. 关于Curator的普通增删改查API操作
一个全局的 curator private static CuratorFramework curatorFramework;
创建连接
private static void createZkCuratorConnection(String ipPort) {
curatorFramework=CuratorFrameworkFactory
.builder() // 使用工厂类来建造客户端的实例对象
.connectString(ipPort) // 配置zk服务器IP port
.sessionTimeoutMs(4000)// 设定会话时间
.retryPolicy(new ExponentialBackoffRetry(1000,3))//设置及重连策略
.namespace("curator")//方便管理的命名空间,其实就是一级目录
.build();//建立管道
curatorFramework.start();//开启curator
}
关闭连接
private static void closeZkCuratorConnection() {
curatorFramework.close();
}
创建结点
这里就体现了用Cutaotr的好处了,不需要先创建父才创建子,可以设置检测需要父时候却没有的请看下自动创建
private static void createZnode(String path, String value) throws Exception {
curatorFramework
.create()//创建Znode
.creatingParentsIfNeeded()//如果是多级结点,这里声明如果需要,自动创建父节点
.withMode(CreateMode.PERSISTENT)//声明结点类型
.forPath(path,value.getBytes());//声明结点路径和值
}
删除结点
同样是cutator的优点,如果有子节点,会先自动删除子节点再删除本结点,可自动递归删除
private static void deleteZnode(String path) throws Exception {
curatorFramework
.delete()
.deletingChildrenIfNeeded()//如果有子节点,会先自动删除子节点再删除本结点
.forPath(path);
}
查询结点值
private static void getZnodeData(String path) throws Exception {
byte[] dataBytes = curatorFramework.getData().forPath(path);
System.out.println("结点值为:" +new String(dataBytes));
}
当然也可以顺便把结点状态存储下来.
private static void getZnodeData(String path) throws Exception {
Stat stat=new Stat();
byte[] dataBytes = curatorFramework.getData().storingStatIn(stat).forPath(path);
System.out.println("结点值为:" +new String(dataBytes));
System.out.print("目前版本为:"+stat.getVersion()+"创建时间为:"+stat.getCtime());
}

设置新值
private static void setValue(String path,String value) throws Exception {
Stat stat = curatorFramework.checkExists().forPath(path);
if (stat==null){
System.out.println("Znode does not exists");
}else {
curatorFramework
.setData()
.withVersion(stat.getVersion())
.forPath(path,value.getBytes());
}
}
重新调用查询方法看看新值

二 . Curator的事件机制
Cutator提供了三种完善而又灵活的监听机制
- PathchildCache ~监听一个节点下子节点的创建、删除、更新
- NodeCache ~监听一个节点的更新和创建事件(不包括删除)
- TreeCache ~综合PatchChildCache和INodeCache的特性
pom
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
添加事件监听机制
private static void addWatcherWithNodeCache(String path) throws Exception {
final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
NodeCacheListener nodeCacheListener=new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("事件路径:"+nodeCache.getCurrentData().getPath()+"发生数据变化,新数据为"+new String(nodeCache.getCurrentData().getData()));
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}


ps:这里别忘了我们是要监听结点变化,可不能一启动就停了,可以再工作线程重加入System.in.read();使线程持续运行.
结点的孩子结点变化
private static void addWatcherWithChildrenCache(String path) throws Exception {
final PathChildrenCache childrenCache=new PathChildrenCache(curatorFramework,path,true);//缓存数据
PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("事件路径:"+pathChildrenCacheEvent.getData().getPath()+"事件类型"+pathChildrenCacheEvent.getType());
}
};
childrenCache.getListenable().addListener(pathChildrenCacheListener);
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
}
Xshell测试

结果

关于pathChildrenCache启动过程有三种策略
- NORMAL
- BUILD_INITIAL_CACHE
- POST_INITIALIZED_EVENT
最后一个TreeNode测试
private static void addWatcherWithTreeCache(String path) throws Exception {
TreeCache treeCache=new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener=new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("事件路径:"+treeCacheEvent.getData().getPath()+"事件类型"+treeCacheEvent.getType()+"结点值为"+new String(treeCacheEvent.getData().getData()));
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
}


关于源码分析,这里推荐一篇博客,写的非常棒,非常详细
https://my.oschina.net/roccn/blog/918209
网友评论