zookeeper java api使用简介
1.建立连接
CONNECT_HOST 服务器地址及端口 例如127.0.0.1:2181
TIMEOUT 超时时间
new JavaApiClient() watch 必须实现Watch接口
zk = new ZooKeeper(CONNECT_HOST, TIMEOUT, new JavaApiClient());
2.获取数据
2.1 同步获取数据
- param 1:节点名称
- param 2:是否需要监听 true 默认的监听器
- param 3: stat节点状态信息,调用完成后能从此状态信息获取节点状态
/**
* 同步获取节点数据
*
* @return 节点数据
*/
private byte[] getDataSync() {
byte[] resdata = null;
try {
resdata = zk.getData("/worke_1", false, new Stat());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return resdata;
}
2.2 异步获取节点数据
getDataASync:
- param 1:节点名称
- param 2:是否监听
- param 3:回调类 异步成功后会调用该类的processResult方法
processResult:
- @param i i<0获取失败 i=0 获取成功
- @param s 节点名称
- @param o 传入的上下文
- @param bytes 节点数据
- @param stat 状态对象
/**
* 异步获取
*/
private void getDataASync() {
zk.getData("/worke_1", false, new GetDataCallBack(), null);
}
class GetDataCallBack implements AsyncCallback.DataCallback {
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
}
}
3. 创建节点
3.1 同步创建节点
create方法:
- param 1:节点名称
- param 2:节点存储数据
- param 3:acl权限
- param 4:节点类型 此处为临时节点
/**
* 同步创建节点
*
* @return 节点路径
*/
private String createNodeSync() {
String path = null;
try {
path = zk.create("/worke_1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return path;
}
3.2 异步创建节点
方法create:
- param 1.节点名称
- param 2.数据data
- param 3.权限acl
- param 4.节点类型
- param 5.回调类
- param 6.上下文
/**
* 异步创建节点
*/
private void createANodeASync() {
zk.create("/worke_1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new CreateStringCallBack(), "创建");
}
/**
* 异步创建调用类
*/
class CreateStringCallBack implements AsyncCallback.StringCallback {
/**
* @param i i<0创建失败 i=0 创建成功
* @param s 节点名称
* @param o 传入的上下文
* @param s1 节点名称
*/
public void processResult(int i, String s, Object o, String s1) {
}
}
4. 获取子节点列表
4.1 同步获取
getChildren:
- param 1:节点名
- param 2:是否监听
/**
* 获取node节点
*
* @return
*/
private List<String> listNodeSync() {
List<String> childrens = null;
try {
childrens = zk.getChildren("/", false);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return childrens;
}
4.2 异步获取
getChildren:
- param 1:节点名称
- param 2:是否监听
- param 3:回调类
- param 4:上下文
private void listNodeAsync() {
zk.getChildren("/", false, new GetCallBack(), "async get");
}
/**
* 异步获取调用类
*/
class GetCallBack implements AsyncCallback.ChildrenCallback {
/**
*
* @param i i<0 获取失败 i=0成功
* @param s s 节点
* @param o o 上下文信息
* @param list 子节点列表
*/
public void processResult(int i, String s, Object o, List<String> list) {
System.out.println("getcallback");
System.out.println(list);
}
}
5.判断节点是否存在
方法exists
- param 1: 节点名称
- param 2:是否监听
- return : 状态信息
/**
* 同步判断是否存在节点
*/
private boolean existNodeSync() {
boolean flag = false;
try {
System.out.println("判断是否存在");
Stat stat = zk.exists("/worke_1", false);
if (stat != null) {
flag = true;
}
System.out.println(stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return flag;
}
6. 删除节点
6.1 同步删除节点
delete:
- param 1:节点路径
- param 2:是否递归删除子节点信息
/**
* 同步删除节点
*
* @param path
* @param version version参数指定要更新的数据的版本, 如果version和真实的版本不同, 删除操作将失败. 指定version为-1则忽略版本检查.
*/
private void deleteNodeSync(String path, int version) {
try {
zk.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
6.2 异步删除
delete:
- param 1: 节点名
- param 2: version
- param 3: 删除回调类
- param 4: 上下文信息
/**
* 异步删除节点
*
* @param path
* @param version version参数指定要更新的数据的版本, 如果version和真实的版本不同, 删除操作将失败. 指定version为-1则忽略版本检查.
*/
private void deleteNodeASync(String path, int version) {
zk.delete(path, -1, new DelCallBack(), "删除");
}
/**
* 删除回调
*/
class DelCallBack implements AsyncCallback.VoidCallback {
/**
*
* @param i i<0 删除失败 i=0 删除成功
* @param s s 节点
* @param o o 上下文信息
*/
public void processResult(int i, String s, Object o) {
}
}
7. 权限设置
权限模式(scheme):ip,digest
授权对象:
ip模式:具体的ip
digest模式: username:Base64(SHA-1(username:password))
private void createANodeASync() throws NoSuchAlgorithmException {
//ip权限
ACL aclIP = new ACL(ZooDefs.Perms.ADMIN, new Id("ip", "192.168.1.111"));
//用户名密码的复合权限
ACL aclDigest = new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE, new Id("digest", DigestAuthenticationProvider.generateDigest("test:test")))
ArrayList<ACL> aclList =new ArrayList<>();
aclList.add(aclIP);
aclList.add(aclDigest);
zk.create("/worke_1", "123".getBytes(), aclList, CreateMode.EPHEMERAL, new CreateStringCallBack(), "创建");
}
8. 监听事件
1.实现Watcher接口,实现process方法
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
}
}
总结
-
一次性监控:从原生的API中我们可以看出,由于Watch事件具有one-time trigger(一次性触发)的特性,所以我们每次操控节点时(增删改)都要提前注册监控,这是原生API的缺陷。
-
不返回数据:当我们对节点(增删改)操作完成之后,需要我们自己封装方法去读取更改后的内容,原生的API并不提供,这有点不符合我们的编程习惯。
-
不提供递归创建和递归删除:原生的API不提供递归创建节点和递归删除节点,也就是说,当我们创建子节点时,父节点必须存在,当我们删除节点时,该节点必须是子节点,若其父节点存在,会throw Exception。
网友评论