前面介绍俩中客户端,后面实现
分布式锁
,注册中心 见Dubbo专题
zookeeper客户端
1.zkclient.sh(linux)(这里忽略,启动原生api,ZookeeperMain)
2.原生api(之前已介绍)
3.zkClient (https://github.com/sgroschupf/zkclient)
4.curator(http://curator.apache.org)
原理
`同mybatis与hibernate`的区别,对原生api的一种封装,只是程度不一样,curator更复杂一些,但是它的stream风格不错!
简单实用
1.zkclient的使用
依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
相关crud及Test
package com.huey.zkclient.znode;
/**
* @author huey China.
* @Description : zkClient Crud
* @Date Created in 2018/11/18 下午2:55
*/
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
public class ZkClientCrud<T> {
ZkClient zkClient;
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
public ZkClientCrud() {
this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
}
/***
*
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public T readData(String path){
return zkClient.readData(path);
}
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
/***
* 支持创建递归方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
}
package com.huey.zkclient.znode;
import org.junit.Test;
/**
* @author huey China.
* @Description : zkClient CRUD
* @Date Created in 2018/11/18 下午3:05
*/
public class ZkclientTest {
public static void main(String[] args) {
ZkClientCrud zkClientCrud = new ZkClientCrud();
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientCrud.createPersistent("/huey_zkClient", user);// ok
System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
user.setAge(20);
zkClientCrud.writeData("/huey_zkClient",user);
System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
}
@Test
public void testDel(){
ZkClientCrud zkClientCrud = new ZkClientCrud();
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientCrud.deleteRecursive("/huey_zkClient"); // ok
}
}
watcher
package com.huey.zkclient.watcher;
/**
* @author huey China.
* @Description : zkClientWatcher
* @Date Created in 2018/11/18 下午2:54
*/
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.Watcher;
import java.util.List;
public class ZkClientWatcher<T> {
ZkClient zkClient;
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
public ZkClientWatcher() {
this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
}
public T readData(String path){
return zkClient.readData(path);
}
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
/***
*
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public void lister(String path){
//对父节点添加监听变化。
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("变更的节点为:%s,%s", dataPath,data );
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("删除的节点为:%s", dataPath );
}
});
//对父节点添加监听子节点变化。
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath+",currentChilds:"+currentChilds);
}
});
//对父节点添加监听子节点变化。
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if(state== Watcher.Event.KeeperState.SyncConnected){
//当我重新启动后start,监听触发
System.out.println("连接成功");
}else if(state== Watcher.Event.KeeperState.Disconnected){
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
}else
System.out.println("其他状态"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
}
package com.huey.zkclient.watcher;
import com.huey.zkclient.znode.User;
import org.junit.Test;
/**
* 由于zkClient创建连接的时候指定了默认的序列化类-new SerializableSerializer(),
* 所以存储在节点上的值也是序列化后的字节数组,当使用zkCli.sh在控制台set /xxx/xx的值时,
* 存储的是普通的字符串字节数组。所以当set值时虽然触发了值改变事件,但zkClient无法反序列化这个值。
* 1、在我们ZkClientWatcher这个类中是加了序列化的(org.I0Itec.zkclient.ZkClient#ZkClient(org.I0Itec.zkclient.IZkConnection, int, org.I0Itec.zkclient.serialize.ZkSerializer)
* 在zkCli.sh 并没有 然后我为了验证 我在zkCli.sh 删除节点和增加节点都可以
* 感应到事件
*
* @author huey China.Ø
* @Description :
* @Date Created in 2018/11/18 下午3:39
*/
public class ZkClientWatcherTest {
private static ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
public static void main(String[] args) throws InterruptedException {
String path = "/huey_zkClient";
zkClientWatcher.deleteRecursive(path);
zkClientWatcher.lister(path);
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientWatcher.createPersistent(path, user);
Thread.sleep(2000);
user.setAge(23);
zkClientWatcher.writeData(path, user);//更改 ok
Thread.sleep(Integer.MAX_VALUE);
}
/**
*
*ok
*/
@Test
public void testUpdate(){
String path = "/huey_zkClient";
zkClientWatcher.writeData(path,System.currentTimeMillis());
}
/**
* @author huey China.
* @Description : ok
* @Date Created in 2018/11/18 下午3:56
*/
@Test
public void testDel(){
String path = "/huey_zkClient";
zkClientWatcher.deleteRecursive(path);
}
}
2.curator的使用
依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
相关crud及Test
package com.huey.curator.znode;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorCrud {
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
CuratorFramework cf ;
public CuratorCrud() {
//1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
cf = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 开启连接
cf.start();
}
public String createPersistent(String path,String data){
try {
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,data.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String getData(String path){
try {
return new String(cf.getData().forPath(path));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void delete(String path){
try {
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
public void setData(String path,String data){
try {
cf.setData().forPath(path,data.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.huey.curator.znode;
/**
* @author huey China.
* @Description : curator CRUD 类似 具体看api
* @Date Created in 2018/11/18 下午4:04
*/
public class CuratorTest {
public static void main(String[] args) {
CuratorCrud zkClientCrud=new CuratorCrud();
zkClientCrud.createPersistent("/huey/abc","abc");
System.out.println(zkClientCrud.getData("/huey/abc"));
}
}
package com.huey.curator.watcher;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;
/**
* @author huey China.
* @Description : Curator 监听 test
* @Date Created in 2018/11/18 下午4:09
*/
public class CuratorWatcherTest {
/** Zookeeper info */
private static String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
private static final String ZK_PATH = "/curator_test";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
connectString,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.Register watcher 子目录事件
PathChildrenCache watcher = new PathChildrenCache(
client,
ZK_PATH,
true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
Thread.sleep(Integer.MAX_VALUE);
}
}
分布式锁
1.原生api简单模拟
package com.huey.locks;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/***
* 唯一特性 重复获取
*/
public class WkLock {
private ZooKeeper zookeeper;
private String path = "/huey";
private CountDownLatch latch=null;
public WkLock(String host, String path) {
try {
this.zookeeper =new ZooKeeper(host, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
this.path = path;
}
/**
* @author huey China.
* @Description : 同步锁控制唯一性
* @Date Created in 2018/11/18 下午4:25
*/
public void lock() {
try {
zookeeper.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
this.latch = new CountDownLatch(1);
try {
this.latch.await(1000, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁 来个线程
} catch (InterruptedException e1) {
e1.printStackTrace();
}
this.latch = null;
lock();
}
}
/**
* @author huey China.
* @Description : 释放锁
* @Date Created in 2018/11/18 下午4:26
*/
public void unlock() {
try {
zookeeper.delete(path, -1);
} catch (Exception e) {
}
}
}
package com.huey.locks;
/**
* @author huey China.
* @Description : demo
* @Date Created in 2018/11/18 下午4:31
*/
public class WukongLockTest implements Runnable{
WkLock wkLock=new WkLock("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181","/wklock");
static int i=0;
public static void main(String[] args) throws InterruptedException {
WukongLockTest lockTest2=new WukongLockTest();
Thread t1= new Thread(lockTest2);
Thread t2= new Thread(lockTest2);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
@Override
public void run() {
try {
for(int j=0;j<300;j++){
wkLock.lock();
i++;
wkLock.unlock();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.Curator内置API
package com.huey.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest implements Runnable {
final static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181").retryPolicy(new ExponentialBackoffRetry(100, 1)).build();
static int i = 0;
/**
* @author huey China.
* @Description : Curator内置分布式api锁处理 类似juc的信号量
* @Date Created in 2018/11/18 下午4:30
*/
final InterProcessMutex lock = new InterProcessMutex(client, "/lock");
public static void main(String[] args) throws InterruptedException {
client.start();
CuratorLockTest lockTest2 = new CuratorLockTest();
Thread t1 = new Thread(lockTest2);
Thread t2 = new Thread(lockTest2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
@Override
public void run() {
try {
for (int j = 0; j < 300; j++) {
lock.acquire();
i++;
lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果为600
总结
原生api采用唯一性实现,原生及curator性能略差,适合低并发,
zk做分布式锁并不是很好
,redis实现更好一些(待续)
.参考
官网:http://zookeeper.apache.org
书籍:从Paxos到Zookeeper
网课: 推荐 慕课网 图灵学院 谷粒学院
网友评论