zkClient:开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端
- 创建会话(同步,重试)
- 创建节点(同步,递归创建)
- 删除节点(同步,递归删除)
- 代码示例
import net.lc7.model.User;
import net.lc7.util.ZkPropertiesUtil;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* @Description:
* @Author: Jason.zhu
* @Create: 2019/05/22 17:37
*/
public class ZkClientMain {
private static ZkClient zkClient;
public static void main(String[] args) throws Exception{
String ip_port = ZkPropertiesUtil.getZkServerIp();
int sessionTimeout = 10000;
int connetionTimeout = ZkPropertiesUtil.getZktimeout();
zkClient = new ZkClient(ip_port,sessionTimeout,connetionTimeout,new SerializableSerializer());
updateNodeData();
}
/**
* 新建节点
*/
private static void createNode(){
User user = User.builder().age(18).id(1).name("jason").build();
String node = zkClient.create("/user",user,CreateMode.PERSISTENT);
System.out.println(node);
}
//获取节点
private static void getNodeData(){
Stat stat = new Stat();
User user1 = zkClient.readData("/user", stat);
System.out.println("name : " + user1.getName() );
System.out.println("stat : " + stat);
}
/**
* 创建子节点
*/
private static void createChildrenNode(){
String node = zkClient.create("/user/1","user1",CreateMode.PERSISTENT);
System.out.println(node);
node = zkClient.create("/user/2","user2",CreateMode.PERSISTENT);
System.out.println(node);
node = zkClient.create("/user/3","user3",CreateMode.PERSISTENT);
System.out.println(node);
}
/**
* 获取子节点
*/
private static void getChildNode(){
String node = "/user";
boolean exist = zkClient.exists(node);
if(exist){
List<String> childNodes = zkClient.getChildren(node);
childNodes.stream().forEach(System.out::println);
}
}
private static void delNode(){
String node = "/user";
boolean exist = zkClient.exists(node);
if(exist){
// zkClient.delete(node);//删除当前节点,有子节点无法删除
zkClient.deleteRecursive(node);//删除当前节点,有子节点删除子节点
}
}
/**
* 更新节点数据
*/
private static void updateNodeData(){
User user = User.builder().age(11).name("wangwu").id(12).build();
String node = "/user";
zkClient.writeData(node, user);
}
/**
* 监控子节点
*/
private static void watchChildChange() throws InterruptedException {
String node = "/user";
zkClient.subscribeChildChanges(node, (str,strs) -> {
System.out.println(str);
System.out.println(strs);
});
Thread.sleep(Long.MAX_VALUE);
}
/**
*监控节点
*/
private static void watchNode(){
String node ="/user";
zkClient.subscribeDataChanges(node, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("node name : " + s + ",updated !!");
User user = (User)o;
System.out.println("node new value : " + user);
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("node name : " + s + ", deleted !!");
}
});
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
@Data
@Builder
public class User implements Serializable {
private int id;
private int age;
private String name;
@Override
public String toString(){
return "id=" + id + ", age="+age + ",name="+name;
}
}
curator:开源的zk客户端,在原生API基础上封装,apache顶级项目
- Curator采用Fluent风格API
- Curator对zk进行基本操作代码示例:
import net.lc7.util.ZkPropertiesUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
/**
* @Description: Curator对zk进行基本操作,Curator采用流式风格API
* @Author: Jason.zhu
* @Create: 2019/05/24 17:55
*/
public class CuratorClient {
private static CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
ZkPropertiesUtil.getZkServerIp(),
new RetryNTimes(10, 5000));
private static String path = "/curator_node";
public static void main(String[] args) throws Exception {
curatorFramework.start();
//create node
String data = "curator_data";
curatorFramework.create().creatingParentsIfNeeded().forPath(path,data.getBytes());
//get Node and Data
print("ls", "/");
print(curatorFramework.getChildren().forPath("/"));
print("get", path);
print(curatorFramework.getData().forPath(path));
//update node data
String dataNew = "curator_data_new";
print("set", path, dataNew);
curatorFramework.setData().forPath(path, dataNew.getBytes());
print("get", path);
print(curatorFramework.getData().forPath(path));
//remove node
print("delete", path);
curatorFramework.delete().forPath(path);
print("ls", "/");
print(curatorFramework.getChildren().forPath("/"));
curatorFramework.close();
}
private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
System.out.println(text.toString());
}
private static void print(Object result) {
System.out.println(
result instanceof byte[]
? new String((byte[]) result)
: result);
}
}
public class ZkPropertiesUtil {
private static Properties zkProperties = new Properties();
static {
InputStream is = ZkPropertiesUtil.class.getResourceAsStream("/zk.properties");
try {
zkProperties.load(is);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static String getZkServerIp(){
return zkProperties.getProperty("zk.zkServerIps");
}
public static int getZktimeout(){
return Integer.valueOf(zkProperties.getProperty("zk.timeout"));
}
public static void main(String[] args) {
String source = "0101888";
System.out.println(source.substring(0, source.length() - 2));
}
}
- Curator操作zk实现分布式锁
import net.lc7.util.ZkPropertiesUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
/**
* @Description: Curator操作zk实现分布式锁
* @Author: Jason.zhu
* @Create: 2019/05/27 11:28
*/
public class CuratorDistributeLock {
private static String lockPath = "/lockPath";
public static void main(String[] args) throws Exception{
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp(),
new RetryNTimes(10,5000));
curatorFramework.start();
System.out.println("zk client start successfully!!");
Thread.sleep(1000L);
Thread t1 = new Thread(() -> {
doWithLock(curatorFramework);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(curatorFramework);
}, "t2");
t1.start();
t2.start();
// curatorFramework.close();
}
private static void doWithLock(CuratorFramework curatorFramework){
System.out.println("Client state : " + curatorFramework.getState());
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
try {
if(lock.acquire(10*1000, TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName() + "hold lock!!");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + "release lock!!");
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
lock.release();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
- Curator操作zk实现Leader选举
import net.lc7.util.ZkPropertiesUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
/**
* @Description: Leader选举
* 当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。
* Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。
* 注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。
* autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
* @Author: Jason.zhu
* @Create: 2019/05/27 11:49
*/
public class CuratorLeaderClient {
private static String path = "/ensurePath";
public static void main(String[] args) throws InterruptedException {
LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println(Thread.currentThread().getName() + " take leadership !!");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " relinquish leadership !!");
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println(connectionState.name() + " state changed !! " + connectionState.isConnected());
}
};
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
Thread.sleep(Integer.MAX_VALUE);
}
public static void registerListener(LeaderSelectorListener listener){
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZkPropertiesUtil.getZkServerIp()
,new RetryNTimes(10, 5000));
curatorFramework.start();
//ensure path
try {
new EnsurePath(path).ensure(curatorFramework.getZookeeperClient());
} catch (Exception e) {
e.printStackTrace();
}
//register listener
LeaderSelector selector = new LeaderSelector(curatorFramework, path, listener);
selector.autoRequeue();
selector.start();
}
}
- Curator操作ZK实现监听功能
import net.lc7.util.ZkPropertiesUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;
/**
* @Description: Curator实现监听功能
*
* Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
* Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
* Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
*
* @Author: Jason.zhu
* @Create: 2019/05/27 11:10
*/
public class CuratorWatchClient {
private static String watcherNode = "/watcherNode";
public static void main(String[] args) throws Exception {
CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory.newClient(
ZkPropertiesUtil.getZkServerIp(),
new RetryNTimes(10, 5000));
curatorFrameworkClient.start();
System.out.println("zk client start successfully!!");
//register watcher
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFrameworkClient, watcherNode, true);
pathChildrenCache.getListenable().addListener((client, event) -> {
ChildData data = event.getData();
if(null == data){
System.out.println("no data in event [" + event +"]");
}else {
System.out.println(
"Receive event :" +
"type=["+ event.getType() +"], " +
"path=[" + data.getPath() + "]," +
"data=["+ data.getData() +"]," +
"state=[" + data.getStat() + "]"
);
}
});
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println(" Register zookeeper watcher successfully!! Please operate in terminal to show the listener function");
Thread.sleep(Integer.MAX_VALUE);
}
}
网友评论