依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh5.7.0</version>
</dependency>
连接Zookeeper
连接
ZooKeeper zk = new ZooKeeper("host000:2181", 5000,
(watchedEvent) -> System.out.println(String.format("接收到watcher通知:%s", watchedEvent)));
System.out.println("---------开始连接---------");
System.out.println(String.format("连接状态:%s",zk.getState()));
Thread.sleep(2000);
System.out.println(String.format("连接状态:%s",zk.getState()));
System.out.println("---------完成连接---------");
重连
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
Thread.sleep(2000);
ZooKeeper zkReConn = new ZooKeeper("host000:2181", 5000, null, zk.getSessionId(), zk.getSessionPasswd());
System.out.println("---------开始连接---------");
System.out.println(String.format("连接状态:%s",zkReConn.getState()));
Thread.sleep(2000);
System.out.println(String.format("连接状态:%s",zkReConn.getState()));
System.out.println("---------完成连接---------");
原子操作
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
ArrayList<Op> opList = new ArrayList<>();
opList.add(Op.delete("/baozi",-1));
opList.add(Op.delete("/baozi2", -1));
zk.multi(opList);
创建节点
同步
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
String path = zk.create("/baozi", "data-baozi".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println(path);
异步
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
zk.create("/baozi2", "data-baozi2".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
((rc, path, ctx, name) -> {
String res = String.format("返回的状态码:%s, " +
"创建路径:%s, " +
"办正事的对象:%s, " +
"真实路径:%s",
rc,path,ctx,name);
System.out.println(res);
}),
"{'create':'success'}"
);
// 等创建操作
Thread.sleep(2000);
// 返回结果: 返回的状态码:0, 创建路径:/baozi2, 办正事的对象:{'create':'success'}, 真实路径:/baozi2
监听节点
ZooKeeper zk = null;
@Test
public void getDataAndWatch() throws IOException, KeeperException, InterruptedException {
zk = new ZooKeeper("host000:2181", 5000, (watchEvent) -> {
// 监听到事件之后,重新获取节点
System.out.println("--------------------------");
try {
zk.getChildren("/", true).forEach(System.out::println);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("--------------------------");
});
// 获取节点,打印节点
zk.getChildren("/", true).forEach(System.out::println);
Thread.sleep(Long.MAX_VALUE);
}
判断节点是否存在
@Test
public void exist() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
Stat stat = zk.exists("/baozi", false);
System.out.println(stat == null ? "not exist" : "exist");
}
查看ACL
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
// 默认权限
zk.create("/baozi","data-baozi".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 查看权限
ArrayList<ACL> acls = (ArrayList<ACL>) zk.getACL("/baozi", new Stat());
acls.forEach(System.out::println); // 31,s{'world,'anyone}
自定义ACL
ZooKeeper zk = new ZooKeeper("host000:2181", 5000, null);
// Id(scheme,id)+Perms(permissions)
// 多个可以用| --- ZooDefs.Perms.CREATE | ZooDefs.Perms.READ
String idPwd = DigestAuthenticationProvider.generateDigest("baozi:baozi");
List<ACL> acls = Arrays.asList(
// 可以添加多个自定义ACL
new ACL(ZooDefs.Perms.ALL, new Id("digest", idPwd))
);
// 创建
zk.create("/baozi", "data-baozi".getBytes(), acls, CreateMode.EPHEMERAL);
// 查看权限: 31,s{'digest,'baozi:15AYXiVWL/Rytxd36t6YS6HvuY0=}
zk.getACL("/baozi", new Stat()).forEach(System.out::println);
案例
- 在Zookeeper上创建一个/servers节点
- 服务端代码
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class DistributeServer {
private String connectString = "host000:2181";
private ZooKeeper zk = null;
private void openConnect() throws IOException {
zk = new ZooKeeper(connectString, 5000, null);
}
private void register(String serverName) throws KeeperException, InterruptedException {
zk.create("/servers/server",
serverName.getBytes(), // 注册的数据是serverName,服务器名称
ZooDefs.Ids.OPEN_ACL_UNSAFE,
// 因为服务下线节点就消失,所以用-e
// 因为每注册一个节点就递增一次序列,所以用-s
CreateMode.EPHEMERAL_SEQUENTIAL
);
}
private void business() throws InterruptedException {
// 为了保持上线
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer server = new DistributeServer();
// 建立连接
server.openConnect();
// 注册服务(创建节点
server.register(args[0]);
// 处理业务
server.business();
}
}
- 客户端代码
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
public class DistributeClient {
private String connectString = "host000:2181";
private ZooKeeper zk = null;
private void openConnect() throws IOException {
zk = new ZooKeeper(connectString, 5000, (event -> {
try {
listen();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
private void listen() throws KeeperException, InterruptedException {
// 监听servers节点,并获得子节点
List<String> nodes = zk.getChildren("/servers", true);
for (String node : nodes) {
// 获取子节点的数据,也就是服务器名称
byte[] serverName = zk.getData("/servers/"+node, false, null);
System.out.println(new String(serverName, 0, serverName.length));
}
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
// 1.连接Zookeeper
client.openConnect();
// 2.注册监听
client.listen();
// 3.业务逻辑处理
client.business();
}
}
网友评论