pom文件引入zookeeper包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
连接zk并监听事件
/**
* 连接zk并监听事件
*/
public class ZKDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
public static void main(String[] args) throws IOException {
ZooKeeper zk = new ZooKeeper("10.143.143.185:6181", 5000, new ZKDemo());
System.out.println(zk.getState());
try {
cdl.await();
} catch (Exception e) {
System.out.println("ZK Session established.");
}
}
//监听到事件时进行处理
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {
cdl.countDown();
}
}
}
image.png
如何创建znode并监听事件
/**
* 创建znode并监听事件
*/
public class ZKOperateDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper("10.143.143.185:6181", 5000, new ZKOperateDemo());
cdl.await();
String path1 = zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Success create path: " + path1);
String path2 = zk.create("/zk-test", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("Success create path: " + path2);
}
//监听到事件时进行处理
@Override
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {
cdl.countDown();
}
}
}
image.png
image.png
改变znode数据并监听事件
/**
* 改变znode数据并监听事件
*/
public class ZKDataDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("10.143.143.185:6181", 5000, new ZKDataDemo());
cdl.await();
zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zk.getData("/zk-test", true, stat)));
zk.getData("/zk-test", true, stat);
System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
zk.setData("/zk-test", "456".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
//监听到事件时进行处理
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
cdl.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
try {
System.out.println("path = " + event.getPath() +" " + new String(zk.getData(event.getPath(), true, stat)));
System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
} catch (Exception e) {
}
}
}
}
}
image.png
image.png
改变子节点并监听事件
/**
* 改变子节点并监听事件
*/
public class ZKChildrenDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("10.143.143.185:6181", 5000, new ZKChildrenDemo());
cdl.await();
zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
List<String> list = zk.getChildren("/zk-test", true);
for (String str : list)
System.out.println(str);
zk.create("/zk-test/c2", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState())
if (EventType.None == event.getType() && null == event.getPath()) {
cdl.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
System.out.println("Child: " + zk.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
image.png
image.png
异步调用并完成回调
//异步调用并完成回调
class ChildrenCallback implements AsyncCallback.Children2Callback {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
System.out.println(
"Child: " + rc + ", path: " + path + ", ctx: " + ctx + ", children: " + children + ", stat: " + stat);
}
}
public class ZKChildrenAsyncDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("10.143.143.185:6181", 5000, new ZKChildrenAsyncDemo());
cdl.await();
zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.getChildren("/zk-test", true, new ChildrenCallback(), "ok");
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState())
if (EventType.None == event.getType() && null == event.getPath()) {
cdl.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
System.out.println("Child: " + zk.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
image.png
image.png
网友评论