初识zookeeper
那一年,听见一个叫做dubbo的东西,据说是阿里的,只知道这个牛逼的东西能让你在不同的系统之间的远程调用像本地方法一样去调用,我心甚往,遂观之,然识君。当时觉得他就是个注册中心,后来闲暇时深入研究了下,一发不可收拾,发现zookeeper竟然能做如此多的事情,请听我慢慢道来。
zookeeper的数据类型
我理解的zk其实是一个文件系统,比如:/user/home,/opt/logs等,这些节点在zk中都称为znode,znode的类型分为四种:临时节点,持久节点,临时顺序节点,持久顺序节点
- 临时节点
如果客户端和zk服务端的会话失效(并非连接断开),那么这个节点会被自动删除掉,临时节点不能有子节点 - 持久节点
持久节点创建后就算客户端会话失效,也不会被删除,除非客户端手动删除 - 临时顺序节点
所谓顺序节点,指的是在创建节点的时候会给节点名称后面加上一个10位的数字后缀,数字的范围是0-整数最大值,比如创建的节点名称叫order,那么最终在zk中创建出来的节点名称是:order0000000001,order0000000002, - 持久顺序节点
持久顺序节点和临时顺序节点差不多,只不过他不会随着客户端的会话失效而被删除
在zk的树状文件系统中,同一级目录下不能有同名的znode,当有多个客户端同时向zk集群服务端发起创建同样路径的节点请求时,最终只会有一个客户端创建成功,基于这个特性我们可以来实现分布式锁
zookeeper的用法特性
客户端创建节点之前需要连接zk服务,连接代码如下:
/**
* @author 吴镇
* @description:创建一个简单的zookeeper会话
* @createdate 2016/9/9
*/
public class ZkSimple implements Watcher{
private static ZkSimple zkSimple = null;
//在当前线程中调用CountDownLatch的await方法会使得当前线程一直处于阻塞状态,每次调用countDown
//方法会使得计数器减一
//直到计数器变成0,当前线程即可恢复执行
//可用来使当前线程等待其他线程完成任务
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
private static Stat stat = new Stat();
/**
* 负责处理来自zk服务端的通知
* @param watchedEvent
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("接收watched事件:" + watchedEvent);
if(KeeperState.SyncConnected == watchedEvent.getState()){
//判断事件类型
if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
//将计数器减一
countDownLatch.countDown();
}else if(Event.EventType.NodeChildrenChanged == watchedEvent.getType()){//子节点变更事件
try {
List<String> list = zooKeeper.getChildren(watchedEvent.getPath(),true);
System.out.println("子节点变更后重新获取节点数据:"+list);
} catch (Exception e) {
e.printStackTrace();
System.out.println("获取子节点失败");
}
}else if(Event.EventType.NodeDataChanged == watchedEvent.getType()){ //节点数据变更事件
String data = "";
try {
//数据变更后从新获取数据,并且重新注册watcher
data = ZkDataSync.getData(watchedEvent.getPath());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("数据变更后获取的数据为:" + data);
}
}
}
public static ZooKeeper connectZkService(String connectString, int timeOut) throws IOException {
//参数说明
//参数1:连接zk服务列表,如果有多台zk节点,使用逗号隔开
//参数2:指会话超时时间,单位毫秒
//参数3:为一个实例化的watcher
//构造方法内部实现了与zk服务端之间的TCP连接创建
zooKeeper = new ZooKeeper(connectString,timeOut,new ZkSimple());
System.out.println("当前连接状态:"+zooKeeper.getState());
//由于zk服务端的连接是一个异步的过程,所以这里先阻塞当前线程
//一直阻塞当前main线程,直到计数器为0是抛出异常
try {
countDownLatch.await();
} catch (InterruptedException e) {}
//main线程阻塞状态解除
System.out.println("zk会话已建立");
return zooKeeper;
}
public static void main(String[] args) throws IOException, InterruptedException {
// ZkSimple.connectZkService("10.21.40.55:2181",5000);
}
}
创建连接的类中,实现了Watcher接口,在创建连接的时候将这个类的实例作为参数带过去,表示这个类就是一个Watcher,并且实现了Watcher接口的process方法,watcher在zk中是一个非常重要的概念,当客户端创建节点的时候,当前节点便有了一个watcher,当节点创建数据或者数据变更或者子节点变更的时候,客户端都能接收到这个变化的事件(就是process方法),但是,仅仅只能收到变更的事件,而不能收到变更的内容。timeout表示会话超时时间,单位毫秒,由于创建连接的过程是异步的,所以这里使用CountDownLatch的await方法来阻塞main线程,直到zk服务创建成功(即收到服务端的回调请求,调用process方法),至此连接创建成功。其中process方法实现了Watcher接口中的方法,为zk服务端的回调方法。
当连接创建完了之后,便可以开始创建节点和数据了,客户端向zk注册节点的时候有两种方式:同步和异步,我们来试试同步创建一个临时顺序节点试试:
/**
* @author 吴镇
* @description:
* @createdate 2016/9/13
*/
public class ZkCreateNodeSync {
public static String createZnode(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {
ZooKeeper zooKeeper = null;
try {
zooKeeper = ZkSimple.connectZkService("10.21.40.55:2181",1000 * 10);
} catch (IOException e) {
System.out.println("连接zk服务端异常");
}
return zooKeeper.create(path,data,acl,createMode);
}
public static void main(String[] args) {
try {
//CreateMode.EPHEMERAL-->创建临时节点CreateMode.EPHEMERAL_SEQUENTIAL-->创建临时顺序节点
String path = ZkCreateNodeSync.createZnode("/我是一个临时节点","".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("创建znode的路径为:"+path);
} catch (Exception e) {
e.printStackTrace();
System.out.println("创建znode失败");
}
}
}
创建一个会话超时时间为10S的zk连接,然后创建一个临时顺序节点后,我们去看看zk服务端的节点状况,采用ls命令来查看节点:
ls /
[我是一个临时节点0000000011, dubbo, zk, test, zookeeper]
可以看到临时节点被创建了,并且节点名称后面被添加了0000000011,在10s后,这个节点自动被zk删除了。
再来看看异步创建节点,异步创建节点,需要提供一个回调函数:
/**
* @author 吴镇
* @description:异步创建节点
* @createdate 2016/9/13
*/
public class ZkCreateNodeAsync {
public static void createZnode(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback sc, Object ctx) throws KeeperException, InterruptedException {
ZooKeeper zooKeeper = null;
try {
zooKeeper = ZkSimple.connectZkService("10.21.40.55:2181",1000 * 20);
} catch (IOException e) {
System.out.println("连接zk服务端异常");
}
//异步创建节点
zooKeeper.create(path,data,acl,createMode,sc,ctx);
}
public static void main(String[] args) {
try {
//CreateMode.EPHEMERAL-->创建临时节点 CreateMode.EPHEMERAL_SEQUENTIAL-->创建临时顺序节点
ZkCreateNodeAsync.createZnode("/我是一个异步创建的节点临时","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),"我是一个上下文");
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
System.out.println("异步创建znode失败");
}
}
}
class IStringCallback implements AsyncCallback.StringCallback{
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建节点回调数据:["+rc+"," + path + "," + ctx + ", 真实的路径名称为:" + name);
}
}
其中IStringCallback类就是提供回调函数的,回调函数中会返回这个节点的全路径,其实同步和异步创建过程基本相同,只不过异步不阻塞创建线程而已。当在某一个节点下面创建子节点的时候,会收到zk服务端的通知(注册了watcher),事件类型为:Event.EventType.NodeChildrenChanged(子节点变更事件),变更事件中包含变更的节点路径,根据这个路径我们可以重新通过getChildren方法去获取子节点列表。数据变更的事件为:Event.EventType.NodeDataChanged(节点数据变更事件),可以通过getData方法来获取节点变更的数据,<font color='red'>根据以上特性,我们可以使用zookeeper来实现集中式配置中心</font>。创建数据这里就不多将了,也可以分为同步和异步创建,有一点需要注意,zk的数据只能是byte类型的。
zookeeper的客户端
在上面已经实现了简单的客户端代码,只不过,还有诸多问题,比如,zookeeper的watcher是一次性的,当收某个节点的通知的时候,这个节点上的watcher就会被删除掉了,在实际应用中,频繁的去注册watcher会很浪费资源。还有当会话超时之后,没有重新连接的机制。还有数据类型只有简单的byte,没有对象级别的存储。还有删除节点的时候无法级联删除。针对这些问题,有了一些开源的客户端:Curator和ZkClient,这两个客户端各有优劣,在dubbo中就是使用了ZkClient,ZkClient解决了session超时,watcher一次性等问题,但是文档几乎没有,异常处理过于简单,没有提供现成的各种场景实现。Curator相对来说好很多,也是解决了zk存在的那些问题,并且提供了分布式锁, Master选举,分布式计算器等现成的实现,并且这货现在还是apache的顶级项目,这个身价是ZkClient没法比的呢!!!!~~
zookeeper可以做啥
-
分布式锁
分布式锁的概念在redis那一章中已经介绍过了,这里不多说,先来说一下zk实现分布式锁的功能的大概步骤
1、首先创建一个持久节点,比如叫/lock_node
2、参与获得锁的客户端在/lock_node下面创建临时顺序节点,比如/lock_node/clientA,/lock_node/clientB
3、当前客户端调用getChildren获得/lock_node节点下的所有子节点,且不设置监听
4、将步骤3中获取的节点做排序(按照顺序节点后面的数字来排序),获取其中编号最小的节点名称,然后和自己创建的那个节点进行比较,如果一样,则获取锁成功,如果不是,则watch比自己次小的那个节点
5、如果4步骤中的watch被触发,则继续从步骤3开始去尝试获取锁 -
集中式配置中心
试想一个场景,对于java服务端,有订单系统,库存系统,支付系统等等其他系统,并且每个系统都是集群部署,当需要修改某一个配置的时候,我们需要每一个节点去修改,然后重新部署服务,当节点数量很多的时候,非常麻烦,所以能不能有一种方式能统一修改,并且是热修改呢?zookeeper的watcher特性就非常适合来做这件事情。
针对每一个应用,我们可以在zk上面创建一个对应的节点,比如我们现在有个库存应用叫stock,那么我们可以在zk上建立如下节点:/config/stock,然后对应的配置如下,redis.url,redis.port等等,对应的配置需要的/config/stock下面建立对应的znode如下:/config/stock/redis.url,/config/stock/redis.port,然后将配置对应的value存放在对应的znode下面,注意,这些节点都是持久节点,每一个应用客户端监听自己对应的节点下面的数据变更,当watch被触发的时候,客户端重新去获取变更的数据,然后通过spring提供的配置管理类来将配置重新加载到内存中来实现热修改配置的目的。所有配置需要预先初始化到zk节点中,可以通过命令行操作或者管理界面来初始化。详细代码见我的码云:https://git.oschina.net/imy3069776986/bee -
master选举
master选举其实和分布式锁实现非常类似,只不过分布式锁,是在并发量大的情况下为了防止并发对公共资源的访问无序,所以让并发的请求串行访问。而master选举,是指集群中的节点只允许某一个节点执行某个任务,其他节点不允许执行,也可以利用zk的临时顺序节点来实现,只不过一旦发现自己节点不是最小的那一个,则不需要继续监听了(即任务不需要执行),最终这个任务由节点最小的那个去执行
网友评论