1 zk 简介
zk 管理大数据生态系统中各个组件。(Hadoop、Hive、Spark)

zk应用场景:
zk是一个经典的分布式数据一致性解决方案。致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。
- 维护配置信息
- 分布式锁服务
- 集群管理
- 生成分布式唯一ID
维护配置信息:
在分布式集群上,需要保证每台机器的配置信息一致。例如数据库的url,用户名密码等。zk提供高可用、高性能的配置服务,能够快速高效的完成集群上配置项的更改,并且能够保证集群上机器配置数据的一致性。
zk 提供配置服务,使用Zab这种一致性协议来保证一致性。
例如在hbase中,客户端先连接zk,获取hbase集群的配置信息,然后才能操作。开源的消息队列kafka中,也是使用zk来维护broker信息。在dubbo中也广泛使用了zk来管理一些配置。

分布式锁服务:
在分布式系统中,多台服务器运行着相同的服务。当多个服务器在运行时就需要协调各个服务的进度。zk提出了临时有序节点的概念,通过加锁,保证当某个服务在调用时,其他服务不能进行该操作。如果机器挂掉,释放锁并fail over到其他机器继续执行该服务。

集群管理:
集群中有时因为各种软硬件故障或者网络故障,出现服务器挂掉而被移除出集群或者服务加入集群。zk提供了watch机制,能够将服务的移除/加入的情况通知给集群中其他正常工作的机器,以及时调整存储和计算等任务的分配和执行。zk还会对故障的机器做出诊断并尝试修复。

生成分布式唯一ID:
每次生成一个新ID时,zk会创建一个持久顺序节点,创建操作返回的节点序号,即为新ID,然后把比自己小的ID删掉。
zk的设计目标:
致力于为分布式应用提供一个高性能、高可用、具有严格顺序访问控制能力的分布式协调服务。
- 高性能
zk将全量数据存储在内存中,并直接服务于客户端的所有非事务请求。尤其适用于以读为主的场景。
- 高可用
zk以集群的方式对外提供服务。每台机器都会在内存中维护当前的服务器状态,并且各个机器之间互相保持通信。只要集群中超过一般的机器都能正常工作,那么集群就能正常对外提供服务。
- 严格顺序访问
对于来自客户端的每个请求,zk都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。
2 zk数据模型
zk进行数据存储时的数据模型
zk的数据节点是树状结构(类似linux文件目录结构)。
树中的每个节点称为znode(zookeeper node),一个znode可用由多个子节点。zk的数据节点在结构上表现为树状结构。使用path来定位到某个znode。比如/wh/node1/node11/node112,wh是根节点。
znode兼具文件和目录两种特点。
节点既像文件一样维护了数据、元信息、ACL、时间戳等数据结构,又像目录一样构成了树结构。作为path的一部分。

znode大体上分为三部分:
- 节点的数据:即znode data(节点path,节点data的关系就像map的key-value一样)。
- 节点的子节点children
- 节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等。
节点状态stat的属性:
在zk shell中,通过get命令查看指定路径节点的data、stat信息:

属性说明:
cZxid : 数据节点创建时的事务ID
ctime :数据节点创建时的时间
mZxid : 数据节点最后一次更新时的事务ID
mtime : 数据节点最后一次更新时的时间
pZxid : 数据节点的子节点最后一次被修改时的事务ID
cversion : 子节点的更改次数
dataVersion : 节点数据的更改次数
aclVersion : 节点的ACL的更改次数
ephemeralOwner : 如果节点是临时节点,则表示创建该节点的会话的sessionID;如果节点是持久节点,则该属性值为0
dataLength : 数据内存长度
numChildren : 数据节点当前子节点的数量
节点类型:
分为两种:临时节点和持久化节点。节点的类型在创建时即被确定,并且不能改变。
-
临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(session)结束,临时节点就会被自动删除。当然也可用手动删除。虽然每个临时节点都会被绑定到一个客户端会话,但他们对所有的客户端都是可见的。临时节点不允许拥有子节点。
-
持久化节点:生命周期不依赖会话。并且只有在客户端执行删除操作的时候,才能被删除。
一般使用持久化有序节点来创建分布式唯一ID。
一般使用临时有序节点来创建分布式锁。
3 zk linux单机安装
zk依赖jdk




客户端登陆工具 ./zkCli.sh
./zkCli.sh -server ip
4 zk常用shell命令




无法直接delete含有子节点的节点,要有rmr命令。




ls2 = ls + stat

一个监听器的注册只能捕获一次事件

5 针对zk 数据节点的权限控制
Acl权限控制, Access control list






多个ip授权
setAcl /node2 ip:192.168.1.1:cdrwa,ip:192.168.1.2:cdr





6 zk JavaAPI操作zk数据
通过Java API去操作zk中存储的一系列数据。

package connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import javax.swing.plaf.IconUIResource;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkConnection {
public static void main(String[] args) {
try {
// 计数器
// zk创建是异步的
final CountDownLatch latch = new CountDownLatch(1);
// arg1 zk server ip port
// arg2 client timeout mills
// arg3 watcher
ZooKeeper zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
// 阻塞直到连接创建成功
latch.await();
// 打印会话编号
System.out.println(zk.getSessionId());
System.out.println("all done...");
} catch (Exception e) {
e.printStackTrace();
}
}
}

package create;
import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkCreate1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void testCreate1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : node data
// arg3 : 权限列表 world:anyone:cdrwa
// arg4 : 节点类型 持久化节点
zk.create("/wh/node2", "node2".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
package create;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkCreate1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void testCreate1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : node data
// arg3 : 权限列表 world:anyone:cdrwa
// arg4 : 节点类型 持久化节点
zk.create("/wh/node2", "node2".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create2() throws Exception {
// arg3 : world:anyone:r
zk.create("/wh/node3", "node3".getBytes(),
ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create3() throws Exception {
// world授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("world", "anyone");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
zk.create("/wh/node4", "node4".getBytes(),
acls, CreateMode.PERSISTENT);
}
@Test
public void create4() throws Exception {
// ip授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("ip", "localhost");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zk.create("/wh/node5", "node5".getBytes(),
acls, CreateMode.PERSISTENT);
}
@Test
public void create5() throws Exception {
// auth授权模式
// 添加授权用户
zk.addAuthInfo("digest", "wh:1234".getBytes());
zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.CREATOR_ALL_ACL , CreateMode.PERSISTENT);
}
@Test
public void create6() throws Exception {
// auth授权模式
// 添加授权用户
zk.addAuthInfo("digest", "wh:1234".getBytes());
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("auth", "wh");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
zk.create("/wh/node5", "node5".getBytes(),
acls , CreateMode.PERSISTENT);
}
@Test
public void create7() throws Exception {
// digest授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("digest", "passwdMd5");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zk.create("/wh/node5", "node5".getBytes(),
acls , CreateMode.PERSISTENT);
}
@Test
public void create8() throws Exception {
// 持久化顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(result); // 节点路径
}
@Test
public void create9() throws Exception {
// 临时节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL);
System.out.println(result); // 节点路径
}
@Test
public void create10() throws Exception {
// 临时顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(result); // 节点路径
}
@Test
public void create11() throws Exception {
// 异步方式创建节点
zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// node name
System.out.println(name);
// 上下文参数
System.out.println(ctx);
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package set;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkSet1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void set1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : new node data
// arg3 : 版本号,-1代表版本号不参与更新
Stat stat = zk.setData("/wh/node2", "node2".getBytes(), -1);
System.out.println(stat.getCzxid());
}
@Test
public void set2() throws KeeperException, InterruptedException {
// 异步方式修改节点
// arg1 : node path
// arg2 : new node data
// arg3 : 版本号,-1代表版本号不参与更新
zk.setData("/wh/node2", "node2".getBytes(), -1
, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// 属性描述对象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package delete;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkDel1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void del1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : 版本号,-1代表版本号不参与更新
zk.delete("/wh/node2", -1);
}
@Test
public void del2() throws KeeperException, InterruptedException {
// 异步方式删除节点
// arg1 : node path
// arg2 : new node data
// arg3 : 版本号,-1代表版本号不参与更新
zk.delete("/wh/node2", -1
, new AsyncCallback.VoidCallback() {
public void processResult(int rc, String path, Object ctx) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文参数
System.out.println(ctx);
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package get;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkGet1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void get1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg3 : 读取节点属性的对象
Stat stat = new Stat();
byte[] res = zk.getData("/wh/node2", false, stat);
// 打印节点数据
System.out.println(new String(res));
// 版本信息
System.out.println(stat.getVersion());
}
@Test
public void get2() throws KeeperException, InterruptedException {
// 异步方式获取节点
// arg1 : node path
// arg3 : 版本号,-1代表版本号不参与更新
zk.getData("/wh/node2", false, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// data
System.out.println(new String(data));
// 属性描述对象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package getChildren;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkGetChild {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void getChild1() throws KeeperException, InterruptedException {
// arg1 : node path
List<String> res = zk.getChildren("/wh/node2", false);
// 打印节点数据
for (String child : res)
System.out.println(child);
}
@Test
public void get2() throws KeeperException, InterruptedException {
// 异步方式获取子节点
// arg1 : node path
zk.getChildren("/wh/node2", false, new AsyncCallback.ChildrenCallback() {
public void processResult(int rc, String path, Object ctx, List<String> children) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// 属性描述对象
for (String child : children) {
System.out.println(child);
}
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package exist;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkExistNode {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void exists() throws KeeperException, InterruptedException {
// arg1 : node path
Stat stat = zk.exists("/wh/node2", false);
// 打印节点数据
System.out.println(stat);
// 版本信息
System.out.println(stat.getVersion());
}
@Test
public void exists1() throws KeeperException, InterruptedException {
// 异步方式获取节点
// arg1 : node path
zk.exists("/wh/node2", false, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表创建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// 属性描述对象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}
网友评论