一,Apache Zookeeper
zookeeper是一个分布式协调服务的开源框架,主要用来解决分布式集群中应用系统的一致性问题
【
关于一致性问题:
假设搭了一个hadoop集群,ABC三台服务器,需要配置文件,A上有一个配置文件a.xml,这时候三台电脑都需要这个配置文件,都需要的时候,配置文件需要谁来改呢?
==》
一个客户端程序有可能改配置文件的,修改A需要其他两台保持一致,否则集群不同步不协调,给集群自己来做也可以做,但是这个工作不通用,不是说我搭一个这样的集群需要这个服务,搭建另一个集群也需要,那通用。
这时候就出现zookeeper集群,就没有同步的问题了,那怎么保证一样呢?
==》大家都不用管了,每个机器自己改,其交给zookeeper去管理数据,配置文件。这是其中一个场景
】
其本质是一个分布式小文件存储系统,主要通过目录树方式,进行管理节点,从而维护和监控数据存储的状态变化。
【
zookeeper本身是一个分布式集群架构,可以存储小文件文件,和linux系统差不多:
==》 /itcast/a ip port 666
还可以维护和监控存储数据的状态变化,比如有三台服务器,怎么知道它是不是挂了
==》存在服务器上下线的动态感知,也就是当服务器启动之后,在zookeeper上写一个节点比如itcast001,那就在节点上注册监听器。
】
zookeeper特性
1,全局数据一致性
集群每个服务器保存一相同的数据副本,无论那个client,连接哪个服务器,数据展示都是一致的
2,可靠性
如果消息被一台服务器接收,所有服务器都接收, 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖
3,顺序性
全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息 b 前被发布;
偏序是指如果一个消息b在消息a 后被同一个发送者发布,a必将排在b前面。
4,数据更新原子性
一次数据更新要么成功(半数以上节点成功),要么失败,不存在中间状态
5,实时性
保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。
zookeeper集群角色
zookeeper是主从架构
包含leader,follower,observer
Zookeeper 集群工作的核心
事务请求(写操作)的唯一调度和处理者,保证集群事务处理的顺序性; 集群内部各个服务器的调度者。
对于 create,setData,delete 等有写操作的请求,则需要统一转发给leader处理,leader需要决定编号、执行操作,这个过程称为一个事务。
Follower:
处理客户端非事务(读操作)请求,转发事务请求给 Leader; 参与集群 Leader 选举投票。此外,针对访问量比较大的 zookeeper 集群,还可新增观察者角色。
Observer:
观察者角色,观察 Zookeeper 集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理,对于事务请求,则会转发给 Leader 服务器进行处理。
和follower区别:不会参与任何形式的投票,
同样提供非事务服务,通常用于在不影响集群事务处理能力的前提下提升集群的非事务处理能力。
【
_事务性请求都会交给leader处理,由leader执行后分配其他follower
_事务性请求:添加、修改,删除
_非事务性请求:查询
先了解:
主从,主备概率
1,主从
主角色:leader,master
从角色:slaver,follower
常见的一主多从架构(storm,Hadoop)主从架构各司其职,互相配合,共同对外提供服务
2,主备
主角色:active
备角色:standby
主备角色常用于解决单点故障问题,常见的是一主多备,当主角色发生故障,备角色切换主角色
高可用架构:现实中集群大多是一主多从架构,同时给主角色备一个备角色。
zookeeper集群搭建
前提: 监测jdk是否安装成功
检测集群时间是否同步
检测防火墙是否关闭
检测主机 ip映射有没有配置
一般windows也会配置主机名和ip映射
检查SSH免密登录
安装步骤:
01) 上传zookeeper的安装包到一台服务器上
cd /export/software
rz 选择zookeeper的安装包进行上传
02) 解压zookeeper到指定的目录
tar -zxf zookeeper-3.4.5-cdh5.14.0.tar.gz -C /export/servers/
cd /export/servers/
03) 修改zookeeper的配置文件
cd /export/servers/zookeeper-3.4.5-cdh5.14.0/conf
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
【dataDir】存储数据目录
#存储快照文件的目录,请不要使用tmp临时目录,这里仅仅只是案例缘故
_【Clientport】端口2181
修改内容如下:
dataDir=/export/data/zk
配置文件底部添加如下内容:
_让每台zookeeper都知道集群信息,选举编号=主机名:数据通信端口:选举端口_
server.1=hadoop01:2887:3887
server.2=hadoop02:2887:3887
server.3=hadoop03:2887:3887
备注:
_2181: zookeeper客户端连接端口
_2887: 服务器间通信的端口
_3887: 选举的端口
04) 将配置好的zookeeper分发给其他两台主机上
cd /export/servers/
scp -r zookeeper-3.4.5-cdh5.14.0/ root@hadoop02:$PWD //将zookeeper复制到node02的同级目录下
scp -r zookeeper-3.4.5-cdh5.14.0/ root@hadoop03:$PWD //将zookeeper复制到node03的同级目录下
备注:
scp远程拷贝
-r 拷贝文件夹
$PWD: 和当前目录一致
05) 分别在三台主机上创建数据存放目录和myid文件
hadoop01:执行的命令
mkdir -p /export/data/zk
echo "1" > /export/data/zk/myid //将1字符串写入myid这个文件中,文件位置和文件名不可以修改
cat /export/data/zk/myid //此命令用于查看此文件有没有正确写入 1
hadoop02:执行的命令
mkdir -p /export/data/zk
echo "2" > /export/data/zk/myid
cat /export/data/zk/myid //此命令用于查看此文件有没有正确写入 2
hadoop03:执行的命令
mkdir -p /export/data/zk
echo "3" > /export/data/zk/myid
cat /export/data/zk/myid //此命令用于查看此文件有没有正确写入 3
06) 配置zookeeper的环境变量
vim /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.5-cdh5.14.0
export PATH=$PATH:$ZK_HOME/bin
source /etc/profile 让环境变量生效
07) 启动和关闭zookeeper集群
三台服务器逐个启动: zkServer.sh start
停止zookeeper的命令: zkServer.sh stop
08) 查看zookeeper集群的状态
zkServer.sh status
09) 编写一键启动脚本
cd/export/servers
vim startZKCluster.sh
#!/bin/sh
echo "启动zookeeper中...."
for host in hadoop01 hadoop02 hadoop03
do
ssh -q $host "source /etc/profile; /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start"
done
vim stopZKCluster.sh
#!/bin/sh
echo "启动zookeeper中...."
for host in hadoop01 hadoop02 hadoop03
do
ssh -q $host "source /etc/profile; /export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh stop”
done
_进程:QuorumPeerMain
指的是 ZooKeeper 分布式模式安装。通常由2n+1台 servers 组成。为了保证 Leader 选举(基于 Paxos 算法的实现)能过得到多数的支持,所以 ZooKeeper 集群的数量一般为奇数。
Zookeeper 运行需要 java 环境,所以需要提前安装 jdk。对于安装leader+follower 模式的集群,大致过程如下:
1、配置主机名称到 IP 地址映射配置
2、修改 ZooKeeper 配置文件
3、远程复制分发安装文件
4、设置 myid
5、 启动 ZooKeeper 集群
如果要想使用 Observer 模式,可在对应节点的配置文件添加如下配置:
peerType=observer
其次,必须在配置文件指定哪些节点被指定为 Observer,如:
server.1:hadoop01:2181:3181:observer
日志文件
/bin ——zookeeper.out
tail -100f zookeeper.out
客户端连接
/bin———zkCli.sh
设置一键启动后可直接启动startZKCluser.sh start
shell基本操作
_stat path [watch]
_ set path data [version]
_ ls path [watch]
delquota [-n|-b] path
_ ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
_ delete path [version]
sync path
listquota path
_ rmr path
_ get path [watch]
_ create [-s] [-e] path data acl /EPHEMERAL 瞬息的/SEQUENTAIL顺序的
addauth scheme auth
_ quit
getAcl path
_ close
_ connect host:port_
zookeeper数据模型
01) 典型的树形层次结构
02) zookeeper树上的每个节点被称为znode,znode具有以下特性:
01) znode兼具有文件和目录的特性
既可以像文件存储数据,还可以作为路径标识的一部分,并且可以具有子znode.
02) znode通过路径引用
引用路径必须以斜杠(根目录)开头,必须是绝对路径,而且一个znode的路径都是唯一的
03) znode存放的数据量有限
znode数据存放在内存中,存放数据大小至多1M
04)Znode 具有原子性操作,
读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的 ACL(访问控制列表Access Control Lists ),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。
02) znode的类型
01) 两种节点类型
01) 临时节点
临时节点是指这个Znode的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除
02) 持久节点
持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNode将一直保存在Zookeeper上
02) znode的序列化特性
ZooKeeper还允许用户为每个节点添加一个特殊的属性: SEQUENTIAL.
一旦节点被标记上这个属性,那么在这个节点被创建的时候,Zookeeper会自动在其节点名后面追加上一个整型数字,
这个整型数字是一个由父节点维护的自增数字
03) 整体节点类型:
PERSISTENT: 永久节点
EPHEMERAL: 临时节点
PERSISTENT_SEQUENTIAL: 永久节点 序列化
EPHEMERAL_SEQUENTIAL: 临时节点 序列化
注意: 临时节点不能有子节点
每个 Znode 由 3 部分组成:
1 stat:此为状态信息, 描述该 Znode 的版本, 权限等信息
2 data:与该 Znode 关联的数据
3 children:该 Znode 下的子节点
节点属性
dataVersion:数据版本号,每次对节点进行 set 操作,dataVersion 的值都 会增加 1(即使设置的是相同的数据),可有效避免了数据更新时出现的先后顺 序问题。
cversion :子节点的版本号。当 znode 的子节点有变化时,cversion 的值 就会增加 1。
cZxid :Znode 创建的事务 id。
mZxid :Znode 被修改的事务 id,即每次对 znode 的修改都会更新 mZxid。 对于 zk 来说,每次的变化都会产生一个唯一的事务id,zxid(ZooKeeperTransaction Id)。通过 zxid,可以确定更新操作的先后顺序。例如,如果 zxid1 小于 zxid2,说明 zxid1 操作先于 zxid2 发生,zxid 对于整个 zk 都是唯一的, 即使操作的是不同的 znode。
ctime:节点创建时的时间戳.
mtime:节点最新一次更新发生时的时间戳. ephemeralOwner:如果该节点为临时节点, ephemeralOwner 值表示与该节点
绑定的 session id. 如果不是, ephemeralOwner 值为 0.
在 client 和 server 通信之前,首先需要建立连接,该连接称为 session。连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处于 CLOSED 状态, 此时 session 结束。
3.4. ZooKeeper Watcher(监听机制)
ZooKeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些事件触 发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通 知功能。总的来说可以概括Watcher 为以下三个过程:客户端向服务端注册Watcher、 服务端事件发生触发 Watcher、客户端回调Watcher得到触发事件情况。ls,ls2,get,stat path
Watch 机制特点
一次性触发
事件封装
ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递。WatchedEvent 包含了每一个事件的三个基本属性:
1,通知状态(keeperState)
2,事件类型(EventType)
3,节点路径(path)
event 异步发送,watcher 的通知事件从服务端发送到客户端是异步的。
先注册再触发
Zookeeper 中的 watch 机制,必须客户端先去服务端注册监听,这样事件发送才会触发监听,通知给客户端。
通知状态和事件类型
同一个事件类型在不同的通知状态中代表的含义有所不同,下表列举了常见的通知状态和事件类型。
通知.jpg
java api
Watcher 接口表示一个标准的事件处理器,其定义了事件通知相关的逻辑, 包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型, 同时定义了事件的回调方法:process(WatchedEvent event)。
Zookeeper工作原理
zookeeper是一个分布式协调服务的开源框架,主要用来解决分布式集群中应用系统的一致性问题。涉及到PAXOS算法,以及其数据更新原子性,当master挂掉,paxos算法发起投票,只要投票超过半数更新即通过。
paxos核心思想:当多数Server写成功,则任务数据写成功,也解释了为什么zookeeper集群数为基数,也就是高可用架构。
其本质是一个分布式小文件存储系统。主要通过目录树方式,进行管理节点,从而维护和监控数据存储的状态变化。通过服务器上下线的动态感知,也就是当服务器启动之后,在zookeeper上写一个节点比如itcast001,那就在节点上注册监听器。
简单的说zookeeper=文件系统+通知机制
代码逻辑:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZKClientDemo {
//connectString = IP 或者hosts已经配置了的主机名:端口号
public static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
// 首先使用直接赋值的字串的方式,字符串会在编译期生成在字符串池中。
// 然后final标记的变量(成员变量或局部变量)即成为常量,只能赋值一次。它应该不影响内存的分配。
// (查看资料多了,说法不一,在下对此也有点怀疑了,如果final影响内存分配,烦请各位大侠告知)
// 然后static修饰的String,会在堆内存中复制一份常量池中的值。
// 所以调用static final String 变量,实际上是直接调用堆内存的地址,不会遍历字符串池中的对象,节省了遍历时间。
public static final int sessionTimeout = 3000;
ZooKeeper zk = null;
/*创建测试节点创建的方法*/
@Before
public void init() throws IOException {
// 1,创建连接节点
// 先连接zookeeper,通过new Zookeeper对象,导入Zookeeper包
zk = new ZooKeeper(connectString,sessionTimeout,null);
//点击源代码,得知参数(String connectString , int sessionTimeout,Watcher watcher)
//连接字符串,超时时间,监听
/* 在 client 和 server 通信之前,首先需要建立连接,该连接称为 session。
连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处于 CLOSED 状态,
此时session 结束。
*/
}
@Test
public void CreateTest() throws Exception {
// 2,创建永久节点
String s = zk.create("/itcast/demo2",
"萨摩耶".getBytes("utf-8"),//本身是字节数组,调用String的构造函数.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,//常用的权限列表在Ids.Open_ACL_UNSAFE,开放的
CreateMode.PERSISTENT //CreateMode枚举,包含-e -s节点
//==> 得到返回值S
);
System.out.println(s);
/*点击源码,得知参数(String path,byte[] data,List<ACL>acl 【访问控制列表 Access Control Lists】 ,createMode createMode)
* */
}
@Test
public void updateTest() throws Exception {
Stat stat = zk.setData("/itcast/demo", "金毛".getBytes("utf-8"), -1);//一般用-1
System.out.println(stat.getDataLength());
}
@Test
public void deleteTest() throws Exception {
zk.delete("/itcast/demo",-1);
}
@Test
public void queryTest() throws Exception {
//第三个参数stat,所需要获取的数据版本,null表示最新的版本数据节点
byte[] data = zk.getData("/itcast/demo", true, //是一个接口,通过new一个匿名内部类
null);
System.out.println(new String(data));
//获取子节点
List<String> children = zk.getChildren("/itcast", true);//watch是一个接口
for (String child : children) {
System.out.println(child);
}
}
/*释放资源*/
@After
public void after() throws InterruptedException {
zk.close();
}
}
public class ZKWatchDemo {
public static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
public static final int sessionTimeout = 3000;
ZooKeeper zk = null;
@Before
public void init() throws Exception {
// 1,创建连接节点
// 先连接zookeeper,通过new Zookeeper对象,导入Zookeeper包
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {//watcher是一个接口,可以通过new匿名内部类去实现
//触发监听会调用process方法(callback回调函数)
public void process(WatchedEvent watchedEvent) {
System.out.println("节点:" + watchedEvent.getPath() + "\n" + "类型:" + watchedEvent.getType());
}
});
}
@Test
public void watchData() throws KeeperException, InterruptedException {
byte[] data = zk.getData("/itcast", true, null);
System.out.println(new String(data));
//客户端不关闭
Thread.sleep(Long.MAX_VALUE);
}
@Test
public void watchChildNode() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/itcast", true);
for (String child : children) {
System.out.println(child);
Thread.sleep(Long.MAX_VALUE);
}
Thread.sleep(Long.MAX_VALUE);
}
}
网友评论