zookeeper的开源客户端有基于原生的zookeeper的客户端
还有zkclient
还有curator
,下面就针对原生的api和curator的api来实现zookeeper的基本节点的操作
1.基于zookeeper原生的客户端展示
- 依赖jar的环境
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
- 代码演示
/**
* @Project: 3.DistributedProject
* @description: 用原生zookeeper客户端的api
* @author: sunkang
* @create: 2018-06-23 13:04
* @ModificationHistory who when What
**/
public class OrginZookeeperConnectionDemo {
public static void main(String[] args) throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if( watchedEvent.getState() == Event.KeeperState.SyncConnected){
//连接成功会有SyncConnected事件产生
System.out.println("默认事件"+watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
//如果收到了服务端的响应事件,连接成功,接下来才可以对zookeeper的数据节点进行操作
countDownLatch.countDown();
}
}};
final ZooKeeper zooKeeper = new ZooKeeper("192.168.44.129:2181", 4000,watcher );
countDownLatch.await();
zooKeeper.exists("/zk-test-create", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getPath()+"->"+watchedEvent.getState()+"->"+watchedEvent.getType());
//再次绑定
try {
//这里会触发默认的全局事件
zooKeeper.exists(watchedEvent.getPath(),true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//1.创建临时节点
zooKeeper.create("/zk-test-create","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
Thread.sleep(1000);
System.out.println("添加节点成功");
Stat state = new Stat();
//2.得到当前节点的值
byte[] bytes = zooKeeper.getData("/zk-test-create",null,state);
System.out.println("createNode的当前的值为:"+ new String(bytes));
//3.修改当前节点的值
zooKeeper.setData("/zk-test-create","2".getBytes(),state.getVersion());
//得到当前节点的值
byte[] byte1s = zooKeeper.getData("/zk-test-create",null,state);
System.out.println("createNode的修改后值为 : "+ new String(byte1s));
//4.查看子节点
List<String> childrenList = zooKeeper.getChildren("/",false);
System.out.println("childrenList: "+childrenList);
//5.删除节点的值
zooKeeper.delete("/zk-test-create",state.getVersion());
//6.设置权限认证
zooKeeper.addAuthInfo("digest","foo:true".getBytes());
zooKeeper.create("/zk-book-auth_test","init".getBytes(),ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);
//7.新建客户端需要认证获取对应的节点
byte[] authBytes = zooKeeper.getData("/zk-book-auth_test",false,null);
System.out.println("/zk-book-auth_test的value:"+ new String(authBytes) );
zooKeeper.close();
// System.in.read();
}
}
- 输出结果如下
原生的pai需要watch触发一次就失效了,需要重复的注册watcher,一般来说,客户端启动的时候,可以传入一个全局的watcher,这个watcher会一直有效,当extist的watcher的时候,可以通过传入true来判断是否使用全局默认的watcher
默认事件null->SyncConnected->None
/zk-test-create->SyncConnected->NodeCreated
添加节点成功
createNode的当前的值为:0
默认事件/zk-test-create->SyncConnected->NodeDataChanged
createNode的修改后值为 : 2
childrenList: [curator_recipes_distatomicint_path, zookeeper, zk-test-create, curator_recipes_master_path, curator_recipes_lock_path, locks, c1, curator_recipes_barrier_path]
/zk-book-auth_test的value:init
2..基于curator的客户端展示
- 增加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
- 代码演示
/**
* @Project: 3.DistributedProject
* @description: 使用curator来操作节点
* @author: sunkang
* @create: 2018-06-23 14:25
* @ModificationHistory who when What
**/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curator =CuratorFrameworkFactory.builder()
.connectString("192.168.44.129:2181")
.connectionTimeoutMs(4000)//连接超时时间设置4秒中
.sessionTimeoutMs(4000)//session超时设置4秒中
.retryPolicy(new ExponentialBackoffRetry(1000,3))//设置连接的重试机制
.namespace("curator")//设置命名空间,表明接下来的节点操作都在/curator的下进行操作
.build();
//启动
curator.start();
//1.创建节点 creatingParentsIfNeeded如果子节点的父级节点不存在,会联级创建
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/sunkang/test","sunkang".getBytes());
System.out.println("创建节点成功");
//2.获取节点的状态
Stat state =new Stat();
System.out.println("/sunkang/test的值: "+ new String(curator.getData().storingStatIn(state).forPath("/sunkang/test")));
//3.设置改变节点
curator.setData().withVersion(state.getAversion()).forPath("/sunkang/test","xx".getBytes());
//4.获取子节点
List<String> childrens = curator.getChildren().forPath("/sunkang");
System.out.println("childrens : "+childrens);
//5.检查是否存在
Stat stat = curator.checkExists().forPath("/sunkang");
System.out.println("state: "+ stat);
//6.使用watcher
curator.getChildren().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath()+"->"+event.getState()+"->"+event.getType());
}
}).forPath("/sunkang");
//7.删除节点,deletingChildrenIfNeeded表示级联删除
curator.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath("/sunkang");
System.in.read();
}
}
- 输出结果
创建节点成功
/sunkang/test的值: sunkang
childrens : [test]
state: 505,505,1542703973660,1542703973660,0,1,0,0,0,1,506
/sunkang->SyncConnected->NodeChildrenChanged
3..基于curator的客户端监听的展示
主要利用了NodeCache和PathChildrenCache以及和TreeCache来实现监听。
代码如下 :
/**
* @Project: 3.DistributedProject
* @description: curator实现监听
* @author: sunkang
* @create: 2018-06-23 14:45
* @ModificationHistory who when What
**/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curator =CuratorFrameworkFactory.builder()
.connectString("192.168.44.129:2181")
.connectionTimeoutMs(4000)
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("curator")
.build();
curator.start();
//当前节点的监听
// addListenerWhitNodeCash(curator,"/sunkang");
//监听子节点的监听
// addListenerWhitPathChildCash(curator,"/sunkang");
//综合性事件
addListenerWithTreeCache(curator,"/sunkang");
System.in.read();
}
/**
* 即节点的监听又监听子节点的监听
* @param curator
* @param s
* @throws Exception
*/
private static void addListenerWithTreeCache(CuratorFramework curator, String s) throws Exception {
final TreeCache treeCache = new TreeCache(curator,s);
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("zonghe "+treeCacheEvent.getData()+";"+treeCacheEvent.getType());
}
};
treeCache.getListenable().addListener(listener);
treeCache.start();
}
/**
*对给具体的节点的子节点的增加监听,子节点的删除,创建和数据节点的内容发生变化,会触发监听事件
* @param curator
* @param s
* @throws Exception
*/
private static void addListenerWhitPathChildCash(final CuratorFramework curator, String s) throws Exception {
final PathChildrenCache pathChildrenCache =new PathChildrenCache(curator,s,true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
}
/**
* 给具体的节点的增加监听,创建,删除,数据值改变
* @param curator
* @param s
* @throws Exception
*/
private static void addListenerWhitNodeCash(CuratorFramework curator, String s) throws Exception {
final NodeCache nodeCache = new NodeCache(curator,s);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(nodeCache.getCurrentData().getData()));
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}
更多的操作内容可以参考:https://github.com/sunkang123/zookeeper
网友评论