一、产生背景
项目从单体到分布式转变之后,将会产生多个节点之间协同的问题。如:
1.每天的定时任务由谁哪个节点来执行?
2.RPC调用时的服务发现?
3.如何保证并发请求的幂等
4.....
这些问题可以统一归纳为多节点协调问题,如果靠节点自身进行协调这是非常不可靠的,性能上也不可取。必须由一个独立的服务做协调工作,它必须可靠,而且保证性能。
二、概要
ZooKeeper是用于分布式应用程序的协调服务。它公开了一组简单的API,分布式应用程序可以基于这些API用于同步,节点状态、配置等信息、服务注册等信息。其由JAVA编写,支持JAVA 和C两种语言的客户端。
znode 节点
zookeeper 中数据基本单元叫节点,节点之下可包含子节点,最后以树级方式程现。每个节点拥有唯一的路径path。客户端基于PATH上传节点数据,zookeeper 收到后会实时通知对该路径进行监听的客户端。
常规配置文件说明
// zookeeper时间配置中的基本单位 (毫秒) 心跳时间
tickTime=2000
// 允许follower初始化连接到leader最大时长,它表示tickTime时间倍数 即:initLimit*tickTime
initLimit=10
// 允许follower与leader数据同步最大时长,它表示tickTime时间倍数
syncLimit=5
// zookeper 数据存储目录
dataDir=/tmp/zookeeper
//对客户端提供的端口号
clientPort=2181
// 单个客户端与zookeeper最大并发连接数
maxClientCnxns=60
// 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3
// 自动触发清除任务时间间隔,小时为单位。默认为0,表示不自动清除。
autopurge.purgeInterval=1
三、Zookeeper节点介绍
zookeeper 中节点叫znode存储结构上跟文件系统类似,以树级结构进行存储。不同之外在于znode没有目录的概念,不能执行类似cd之类的命令。znode结构包含如下:
- path:唯一路径
- childNode:子节点
- stat:状态属性
- type:节点类型
节点类型
类型 | 描述 |
---|---|
PERSISTENT | 持久节点,默认创建节点 |
PERSISTENT_SEQUENTIAL | 持久序号节点 |
EPHEMERAL | 临时节点(不可在拥有子节点) |
EPHEMERAL_SEQUENTIAL | 临时序号节点(不可在拥有子节点) |
PERSISTENT(持久节点)
持久化保存的节点,也是默认创建的
默认创建的就是持久节点
create /test
PERSISTENT_SEQUENTIAL(持久序号节点)
创建时zookeeper 会在路径上加上序号作为后缀,。非常适合用于分布式锁、分布式选举等场景。创建时添加 -s 参数即可。
创建序号节点
create -s /test
返回创建的实际路径
Created /test0000000001
create -s /test
返回创建的实际路径2
Created /test0000000002
EPHEMERAL(临时节点)
临时节点会在客户端会话断开后自动删除。适用于心跳,服务发现等场景。创建时添加参数-e 即可。
创建临时节点, 断开会话 在连接将会自动删除
create -e /temp
EPHEMERAL_SEQUENTIAL(临时序号节点)
与持久序号节点类似,不同之处在于EPHEMERAL_SEQUENTIAL是临时的会在会话断开后删除。创建时添加 -e -s
create -e -s /temp/seq
节点属性
查看节点属性
stat /data
其属性说明如下表:
//创建节点的事物ID
cZxid = 0x385
//创建时间
ctime = Tue Sep 24 17:26:28 CST 2019
//修改节点的事物ID
mZxid = 0x385
//最后修改时间
mtime = Tue Sep 24 17:26:28 CST 2019
//子节点变更的事物ID
pZxid = 0x385
//这表示对此znode的子节点进行的更改次数(不包括子节点)
cversion = 0
//数据版本,变更次数
dataVersion = 0
//权限版本,变更次数
aclVersion = 0
//临时节点所属会话ID,0x0为空,代表不是临时节点
ephemeralOwner = 0x0
//数据长度
dataLength = 17
//子节点数(不包括子子节点)
numChildren = 0
节点的监听
客户添加 -w 参数可实时监听节点与子节点的变化,并且实时收到通知。非常适用保障分布式情况下的数据一至性。其使用方式如下:
命令 | 描述 |
---|---|
ls -w path | 监听子节点的变化(增,删) |
get -w path | 监听节点数据的变化 |
stat -w path | 监听节点属性的变化 |
printwatches on/off | 触发监听后,是否打印监听事件(默认on) |
注意1:zookeeper中的监听机制watch event,一个客户端改变,其它客户端都可以监听的到,监听事件默认是开启的。
注意2:zookeeper中的监听机制watch event是一次性的,event再次触发,其它客户端就监听不到了。可以通过客户端触发watch event后再次添加监听实现实时监听。
注意3:zookeeper中的临时节点在退出的时候会删除节点文件,这也是它可以被监听到的原因。
acl权限设置
ACL全称为Access Control List(访问控制列表),用于控制资源的访问权限。ZooKeeper使用ACL来控制对其znode的防问。基于scheme:id:permission的方式进行权限控制。scheme表示授权模式、id模式对应值、permission即具体的增删改权限位。
1.scheme:认证模型
方案 | 描述 |
---|---|
world | 开放模式,world表示全世界都可以访问(这是默认设置) |
ip | ip模式,限定客户端IP防问 |
auth | 用户密码认证模式,只有在会话中添加了认证才可以防问 |
digest | 与auth类似,区别在于auth用明文密码,而digest 用sha-1+base64加密后的密码。在实际使用中digest 更常见。 |
2.permission权限位
权限位 | 权限 | 描述 |
---|---|---|
c | CREATE | 可以创建子节点 |
d | DELETE | 可以删除子节点(仅下一级节点) |
r | READ | 可以读取节点数据及显示子节点列表 |
w | WRITE | 可以设置节点数据 |
a | ADMIN | 可以设置节点访问控制列表权限 |
3.acl 相关命令
命令 | 使用方式 | 描述 |
---|---|---|
getAcl | getAcl <path> | 读取ACL权限 |
setAcl | setAcl <path> <acl> | 设置ACL权限 |
addauth | addauth <scheme> <auth> | 添加认证用户 |
- world权限示例
语法: setAcl <path> world:anyone:<权限位>
注:world模式中anyone是唯一的值,表示所有人
//创建一个节点
create -e /testAcl
//查看节点权限
getAcl /testAcl
//返回的默认权限表示 ,所有人拥有所有权限。
'world,'anyone: cdrwa
//设置为rw权限
setAcl /testAcl world:anyone:rw
//可以正常读
get /testAcl
//无法正常创建子节点
create -e /testAcl/t "hi"
//返回没有权限的异常
Authentication is not valid : /testAcl/t
- IP权限示例
语法: setAcl <path> ip:<ip地址|地址段>:<权限位>
setAcl /testAcl ip:192.168.0.1:rw
- auth模式示例
语法:
setAcl <path> auth:<用户名>:<密码>:<权限位>
addauth digest <用户名>:<密码>
- digest 权限示例:
语法:
setAcl <path> digest :<用户名>:<密钥>:<权限位>
addauth digest <用户名>:<密码>
注1:密钥 通过sha1与base64组合加密码生成,可通过以下命令生成
echo -n <用户名>:<密码> | openssl dgst -binary -sha1 | openssl base64
注2:为节点设置digest 权限后,访问前必须执行addauth,当前会话才可以防问。
设置digest 权限
先 sha1 加密,然后base64加密
echo -n luban:123456 | openssl dgst -binary -sha1 | openssl base64
返回密钥
2Rz3ZtRZEs5RILjmwuXW/wT13Tk=
设置digest权限
setAcl /luban digest:luban:2Rz3ZtRZEs5RILjmwuXW/wT13Tk=:cdrw
四、Zookeeper在Java中的应用
zookeeper 提供了java与C两种语言的客户端。我们要学习的就是java客户端。引入maven依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
客户端
ZooKeeper zooKeeper;
@Before
public void init() throws IOException {
String conn = "192.168.0.149:2181";
zooKeeper = new ZooKeeper(conn, 4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath());
System.out.println(event);
}
});
}
@Test
public void getData() throws KeeperException, InterruptedException {
Stat stat = new Stat();
zooKeeper.getData("/tuling", new Watcher() {
//监听事件
@Override
public void process(WatchedEvent event) {
try {
//再次监听
zooKeeper.getData(event.getPath(), this, null);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(event.getPath());
}
}, stat);
System.out.println(stat);
Thread.sleep(Long.MAX_VALUE);
}
在getData() 与getChildren()两个方法中可分别设置监听数据变化和子节点变化。通过设置watch为true,当前事件触发时会调用zookeeper()构建函数中Watcher.process()方法。也可以添加watcher参数来实现自定义监听。一般采用后者。
注意:所有的监听都是一次性的,如果要持续监听需要触发后在添加一次监听。
@Test
public void getChild2() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/tuling", event -> {
//父节点
System.out.println(event.getPath());
//子节点
zooKeeper.getChildren(event.getPath(), false);
});
children.stream().forEach(System.out::println);
Thread.sleep(Long.MAX_VALUE);
}
注意:以上为监听子节点的操作。只有子节点的增加、删除才能监听得到,仅仅修改子节点是无法监听得到的。
@Test
public void getData4() throws KeeperException, InterruptedException {
zooKeeper.getData("/tuling", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(stat);
}
}, "");
Thread.sleep(Long.MAX_VALUE);
}
注意:DataCallback为getData的回调函数,这边的数据更加全面,rc为返回状态,ctx为自己传入的上下文,会原封不动地返回了。
@Test
public void createData() throws KeeperException, InterruptedException {
List<ACL> list = new ArrayList<>();
int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ;//cdwra
ACL acl = new ACL(perm, new Id("world", "anyone"));
ACL acl2 = new ACL(perm, new Id("ip", "192.168.0.149"));
ACL acl3 = new ACL(perm, new Id("ip", "127.0.0.1"));
list.add(acl);
list.add(acl2);
list.add(acl3);
zooKeeper.create("/tuling/lu", "hello".getBytes(), list, CreateMode.PERSISTENT);
}
以上为创建节点,并设置节点的权限。
五、Zookeeper集群
zookeeper集群的目的是为了保证系统的性能承载更多的客户端连接设专门提供的机制。通过集群可以实现以下功能:
- 读写分离:提高承载,为更多的客户端提供连接,并保障性能。
- 主从自动切换:提高服务容错性,部分节点故障不会影响整个服务集群。
半数以上运行机制说明
集群至少需要三台服务器,并且强烈建议使用奇数个服务器。因为zookeeper 通过判断大多数节点的存活来判断整个服务是否可用。比如3个节点,挂掉了2个表示整个集群挂掉,而用偶数4个,挂掉了2个也表示其并不是大部分存活,因此也会挂掉。
集群角色说明
zookeeper 集群中总共有三种角色,分别是leader(主节点)follower(子节点) observer(次级子节点)
角色 | 描述 |
---|---|
leader | 主节点,又名领导者。用于写入数据,通过选举产生,如果宕机将会选举新的主节点。未产生主节点,集群不能对外开放 |
follower | 子节点,又名追随者。用于实现数据的读取。同时他也是主节点的备选节点,并用拥有投票权。 |
observer | 次级子节点,又名观察者。用于读取数据,与fllower区别在于没有投票权,不能选为主节点。并且在计算集群可用状态时不会将observer计算入内。 |
observer配置:
只要在集群配置中加上observer后缀即可,示例如下:
server.3=127.0.0.1:2889:3889:observer
配置语法
server.<节点ID>=<ip>:<数据同步端口>:<选举端口>
- 节点ID:服务id手动指定1至125之间的数字,并写到对应服务节点的 {dataDir}/myid 文件中。
- IP地址:节点的远程IP地址,可以相同。但生产环境就不能这么做了,因为在同一台机器就无法达到容错的目的。所以这种称作为伪集群。
- 数据同步端口:主从同时数据复制端口,(做伪集群时端口号不能重复)。
- 远举端口:主从节点选举端口,(做伪集群时端口号不能重复)。
- 配置文件示例
tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181
initLimit=5
syncLimit=2
//以下为集群配置,必须配置在所有节点的zoo.cfg文件中
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
集群配置流程
- 分别创建3个data目录用于存储各节点数据
mkdir data
mkdir data/1
mkdir data/2
mkdir data/3
- 编写myid文件
echo 1 > data/1/myid
echo 2 > data/2/myid
echo 3 > data/3/myid
- 编写配置文件
***************//节点1//***************
conf/zoo1.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=data/1
clientPort=2181
//集群配置
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
***************//节点2//***************
conf/zoo2.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=data/2
clientPort=2182
//集群配置
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
***************//节点3//***************
conf/zoo3.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=data/3
clientPort=2183
//集群配置
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
- 分别启动
./bin/zkServer.sh start conf/zoo1.cfg
./bin/zkServer.sh start conf/zoo2.cfg
./bin/zkServer.sh start conf/zoo3.cfg
- 分别查看状态
./bin/zkServer.sh status conf/zoo1.cfg
Mode: follower
./bin/zkServer.sh status conf/zoo2.cfg
Mode: leader
./bin/zkServer.sh status conf/zoo3.cfg
Mode: follower
- 检查集群复制情况
分别连接指定节点
zkCli.sh 后加参数-server 表示连接指定IP与端口。
./bin/zkCli.sh -server 127.0.0.1:2181
./bin/zkCli.sh -server 127.0.0.1:2182
./bin/zkCli.sh -server 127.0.0.1:2183
任意节点中创建数据,查看其它节点已经同步成功。
注意: -server参数后同时连接多个服务节点,并用逗号隔开 127.0.0.1:2181,127.0.0.1:2182
选举机制
通过 ./bin/zkServer.sh status <zoo配置文件> 命令可以查看到节点状态
./bin/zkServer.sh status conf/zoo1.cfg
Mode: follower
./bin/zkServer.sh status conf/zoo2.cfg
Mode: leader
./bin/zkServer.sh status conf/zoo3.cfg
Mode: follower
可以发现中间的2182 是leader状态.其选举机制如下图:
image.png
-
投票机制说明
第一轮投票全部投给自己
第二轮投票给myid比自己大的相邻节点
如果得票超过半数,选举结束。 -
选举触发
当集群中的服务器出现已下两种情况时会进行Leader的选举
1.服务节点初始化启动
2.半数以上的节点无法和Leader建立连接
当节点初始起动时会在集群中寻找Leader节点,如果找到则与Leader建立连接,其自身状态变化follower或observer。如果没有找到Leader,当前节点状态将变化LOOKING,进入选举流程。
在集群运行其间如果有follower或observer节点宕机只要不超过半数并不会影响整个集群服务的正常运行。但如果leader宕机,将暂停对外服务,所有follower将进入LOOKING 状态,进入选举流程。
数据同步机制
zookeeper 的数据同步是为了保证各节点中数据的一至性,同步时涉及两个流程,一个是正常的客户端数据提交,另一个是集群某个节点宕机在恢复后的数据同步。
-
客户端写入请求
写入请求的大至流程是,收leader接收客户端写请求,并同步给各个子节点。如下图:
image.png
但实际情况要复杂的多,比如client 它并不知道哪个节点是leader 有可能写的请求会发给follower ,由follower在转发给leader进行同步处理
image.png
1.client向zk中的server发送写请求,如果该server不是leader,则会将该写请求转发给leader server,leader将请求事务以proposal形式分发给follower;
2.当follower收到leader的proposal时,根据接收的先后顺序处理proposal;
3.当Leader收到follower针对某个proposal过半的ack后,则发起事务提交,重新发起一个commit的proposal;
4.Follower收到commit的proposal后,记录事务提交,并把数据更新到内存数据库;
5.当写成功后,反馈给client。
注意:leader宕机在恢复阶段,集群暂停对外服务,知道集群重新把leader选举出来;前leader宕机恢复后不再是leader,同时它还要同步最新数据,期间它是不对外开放服务的,也只有它其它正常。
-
服务节点初始化同步
在集群运行过程当中如果有一个follower节点宕机,由于宕机节点没过半,集群仍然能正常服务。当leader 收到新的客户端请求,此时无法同步给宕机的节点。造成数据不一至。为了解决这个问题,当节点启动时,第一件事情就是找当前的Leader,比对数据是否一至。不一至则开始同步,同步完成之后在进行对外提供服务。
如何比对Leader的数据版本呢,这里通过ZXID事物ID来确认。比Leader就需要同步。 -
ZXID说明
ZXID是一个长度64位的数字,其中低32位是按照数字递增,任何数据的变更都会导致低32位的数字简单加1。高32位是leader周期编号,每当选举出一个新的leader时,新的leader就从本地事物日志中取出ZXID,然后解析出高32位的周期编号,进行加1,再将低32位的全部设置为0。这样就保证了每次新选举的leader后,保证了ZXID的唯一性而且是保证递增的。
注意:leader选举过程中会比较节点中的ZXID值,ZXID值越大谁就是leader,其后才是根据节点的myId来。
六、zookeeper 典型使⽤场景实践
分布式集群管理
需求
- 主动查看线上服务节点
- 查看服务节点资源使用情况
- 服务离线通知
- 服务资源(CPU、内存、硬盘)超出阀值通知
架构设计
image.png实现
//被监听节点上报类
public class Agent {
private static Agent ourInstance = new Agent();
private String server = "192.168.0.149:2181";
private ZkClient zkClient;
private static final String rootPath = "/manger";
private static final String servicePath = rootPath + "/service";
private String nodePath; // /manger/service0000001 当前节点路径
private Thread stateThread;
public static Agent getInstance() {
return ourInstance;
}
private Agent() {
}
// javaagent 数据监控
public static void premain(String args, Instrumentation instrumentation) {
Agent.getInstance().init();
}
public void init() {
zkClient = new ZkClient(server, 5000, 10000);
System.out.println("zk连接成功" + server);
// 创建根节点
buildRoot();
// 创建临时节点
createServerNode();
// 启动更新的线程
stateThread = new Thread(() -> {
while (true) {
updateServerNode();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "zk_stateThread");
stateThread.setDaemon(true);
stateThread.start();
}
// 数据写到 当前的临时节点中去
public void updateServerNode() {
zkClient.writeData(nodePath, getOsInfo());
}
// 生成服务节点
public void createServerNode() {
nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
System.out.println("创建节点:" + nodePath);
}
// 更新服务节点状态
public String getOsInfo() {
OsBean bean = new OsBean();
bean.lastUpdateTime = System.currentTimeMillis();
bean.ip = getLocalIp();
bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
bean.pid = ManagementFactory.getRuntimeMXBean().getName();
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(bean);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
//获取IP
public static String getLocalIp() {
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
return addr.getHostAddress();
}
//创建永久父节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
}
// 初始化订阅事件
public void initSubscribeListener() {
zkClient.unsubscribeAll();
// 获取所有子节点
zkClient.getChildren(rootPath)
.stream()
.map(p -> rootPath + "/" + p)// 得出子节点完整路径
.forEach(p -> {
zkClient.subscribeDataChanges(p, new DataChanges());// 数据变更的监听
});
// 监听父节点下的子节点的 增加,删除 变更
zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> initSubscribeListener());
}
// 子节点数据变化
private class DataChanges implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
OsBean bean = convert((String) data);
map.put(dataPath, bean);
doFilter(bean);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if (map.containsKey(dataPath)) {
OsBean bean = map.get(dataPath);
System.err.println("服务已下线:" + bean);
map.remove(dataPath);
}
}
}
// 警告过滤
private void doFilter(OsBean bean) {
// cpu 超过10% 报警
if (bean.getCpu() > 10) {
System.err.println("CPU 报警..." + bean.getCpu());
}
}
分布式注册中心
在单体式服务中,通常是由多个客户端去调⽤⼀个服务,只要在客户端中配置唯⼀服务节点地址即可,当升级到分布式后,服务节点变多,像阿⾥⼀线⼤⼚服务节点更是上万之多,这么多节点不可能⼿动配置在客户端,这⾥就需要⼀个中间服务,专⻔⽤于帮助客户端发现服务节点,即许多技术书籍经常提到的服务发现。
一个完整的注册中心涵盖以下功能特性:
- 服务注册:提供者上线时将自提供的服务提交给注册中心。
- 服务注销:通知注册心提供者下线。
- 服务订阅:动态实时接收服务变更消息。
- 可靠:注册服务本身是集群的,数据冗余存储。避免单点故障,及数据丢失。
- 容错:当服务提供者出现宕机,断电等极情况时,注册中心能够动态感知并通知客户端服务提供者的状态。
Dubbo 对zookeeper的使用
阿里著名的开源项目Dubbo 是一个基于JAVA的RCP框架,其中必不可少的注册中心可基于多种第三方组件实现,但其官方推荐的还是Zookeeper做为注册中心服务。
image.png
Dubbo Zookeeper注册中心存储结构
image.png节点说明
类别 | 属性 | 说明 |
---|---|---|
Root | 持久节点 | 根节点名称,默认是 "dubbo" |
Service | 持久节点 | 服务名称,完整的服务类名 |
type | 持久节点 | 可选值:providers(提供者)、consumers(消费者)、configurators(动态配置)、routers |
URL | 临时节点 | url名称 包含服务提供者的 IP 端口 及配置等信息。 |
- 服务端
public class Server {
public void openServer(int port) {
// 构建应用
ApplicationConfig config = new ApplicationConfig();
config.setName("simple-app");
// 通信协议
ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
protocolConfig.setThreads(200);
ServiceConfig<UserService> serviceConfig = new ServiceConfig();
serviceConfig.setApplication(config);
serviceConfig.setProtocol(protocolConfig);
serviceConfig.setRegistry(new RegistryConfig("zookeeper://192.168.0.149:2181"));
serviceConfig.setInterface(UserService.class);
UserServiceImpl ref = new UserServiceImpl();
serviceConfig.setRef(ref);
//开始提供服务 开张做生意
serviceConfig.export();
System.out.println("服务已开启!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
}
public static void main(String[] args) throws IOException {
new Server().openServer(-1);
System.in.read();
}
}
- 客户端
public class Client {
UserService service;
// URL 远程服务的调用地址
public UserService buildService(String url) {
ApplicationConfig config = new ApplicationConfig("young-app");
// 构建一个引用对象
ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(config);
referenceConfig.setInterface(UserService.class);
// referenceConfig.setUrl(url);
referenceConfig.setRegistry(new RegistryConfig("zookeeper://192.168.0.149:2181"));
referenceConfig.setTimeout(5000);
// 透明化
this.service = referenceConfig.get();
return service;
}
static int i = 0;
public static void main(String[] args) throws IOException {
Client client1 = new Client();
client1.buildService("");
String cmd;
while (!(cmd = read()).equals("exit")) {
UserVo u = client1.service.getUser(Integer.parseInt(cmd));
System.out.println(u);
}
}
private static String read() throws IOException {
byte[] b = new byte[1024];
int size = System.in.read(b);
return new String(b, 0, size).trim();
}
}
分布式JOB
- 分布式JOB需求
- 多个服务节点只允许其中一个主节点运行JOB任务。
- 当主节点挂掉后能自动切换主节点,继续执行JOB任务。
-
架构设计
image.png - node结构
父节点:tuling-master
子节点:server0001:master
子节点:server0002:slave
子节点:server000n:slave
- 选举流程
服务启动:
- 在tuling-maste下创建server子节点,值为slave
- 获取所有tuling-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置自己为master节点
子节点删除事件触发:
- 获取所有tuling-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置最小值序号为master 节点
public class MasterResolve {
private String server = "192.168.0.149:2181";
private ZkClient zkClient;
private static final String rootPath = "/tuling-master";
private static final String servicePath = rootPath + "/service";
private String nodePath;
private volatile boolean master = false;
private static MasterResolve resolve;
private MasterResolve() {
zkClient = new ZkClient(server, 2000, 5000);
buildRoot();
createServerNode();
}
public static MasterResolve getInstance() {
if (resolve == null) {
resolve= new MasterResolve();
}
return resolve;
}
// 构建根节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
// 创建server节点
public void createServerNode() {
nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
System.out.println("创建service节点:" + nodePath);
initMaster();
initListener();
}
private void initMaster() {
boolean existMaster = zkClient.getChildren(rootPath)
.stream()
.map(p -> rootPath + "/" + p)
.map(p -> zkClient.readData(p))
.anyMatch(d -> "master".equals(d));
if (!existMaster) {
doElection();
System.out.println("当前当选master");
}
}
private void initListener() {
zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
doElection();// 执行选举
});
}
// 执行选举
public void doElection() {
Map<String, Object> childData = zkClient.getChildren(rootPath)
.stream()
.map(p -> rootPath + "/" + p)
.collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
if (childData.containsValue("master")) {
return;
}
childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
if (p.equals(nodePath)) { // 设置最小值序号为master 节点
zkClient.writeData(nodePath, "master");
master = true;
System.out.println("当前当选master" + nodePath);
}
});
}
public static boolean isMaster() {
return getInstance().master;
}
}
public class MasterResolveTest {
// job 定时任务
@Test
public void MasterTest() throws InterruptedException {
MasterResolve instance = MasterResolve.getInstance();
System.out.println("master:" + MasterResolve.isMaster());
Thread.sleep(Long.MAX_VALUE);
}
}
分布式锁
锁的的基本概念
开发中锁的概念并不陌⽣,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应⽤中我们可以通过 synchronized 或ReentrantLock 来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。⽐如⼀些简单的做法是使⽤ 关系型数据⾏级锁来实现不同进程之间的互斥,但⼤型分布式系统的性能瓶颈往往集中在数据库操作上。为了提⾼性能得采⽤如Redis、Zookeeper之内的组件实现分布式锁。
共享锁:也称作只读锁,当一方获得共享锁之后,其它方也可以获得共享锁。但其只允许读取。在共享锁全部释放之前,其它方不能获得写锁。
排它锁:也称作读写锁,获得排它锁后,可以进行数据的读写。在其释放之前,其它方不能获得任何锁。
锁的获取
zookeeper分布式锁的获取是针对多进程来说的,zookeeper节点可以作为多进程间竞争的唯一资源。锁的操作分读锁、写锁,连续的读锁可以让多个进程并发执行,但是写锁只能由一个进程单独执行,而且还会影响后面锁(无论读锁还是写锁)的操作。
zookeeper分布式锁中,所有的任务线程都会在root节点下面排好序,也就是一个个临时有序的子节点,获取锁的子节点执行完后会释放删除。等待的临时有序子节点只监听到拿到锁的临时节点删除,就判断自己节点的序号是否最小,是的话就轮到自己执行。
羊群效应:产生羊群效应效应的原因是所有的临时子节点都去监听的是root节点下方子节点数量的变化,若子节点有成千上万个的话,会造成所有子节点去读取root节点这一节点,这一过程必然会造成拥挤瓶颈问题。
解决羊群效应:只要去除所有临时子节点对root节点的监听事件,改为所有临时有序节点只监听上一个子节点就行。
public class Lock {
private String lockId;
private String path;
private boolean active;
public Lock(String lockId, String path) {
this.lockId = lockId;
this.path = path;
}
public Lock() {
}
public String getLockId() {
return lockId;
}
public void setLockId(String lockId) {
this.lockId = lockId;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
}
public class ZookeeperLock {
private String server = "192.168.0.149:2181";
private ZkClient zkClient;
private static final String rootPath = "/master-lock1";
public ZookeeperLock() {
zkClient = new ZkClient(server, 5000, 20000);
buildRoot();
}
// 构建根节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
// 获取锁
public Lock lock(String lockId, long timeout) {
// 创建临时节点
Lock lockNode = createLockNode(lockId);
lockNode = tryActiveLock(lockNode);// 尝试激活锁
if (!lockNode.isActive()) {
try {
synchronized (lockNode) {
lockNode.wait(timeout); // 线程锁住
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (!lockNode.isActive()) {
throw new RuntimeException(" lock timeout");
}
return lockNode;
}
// 释放锁
public void unlock(Lock lock) {
if (lock.isActive()) {
zkClient.delete(lock.getPath());
}
}
// 尝试激活锁
private Lock tryActiveLock(Lock lockNode) {
// 获取根节点下面所有的子节点
List<String> list = zkClient.getChildren(rootPath)
.stream()
.sorted()
.map(p -> rootPath + "/" + p)
.collect(Collectors.toList()); // 判断当前是否为最小节点
String firstNodePath = list.get(0);
// 最小节点是不是当前节点
if (firstNodePath.equals(lockNode.getPath())) {
lockNode.setActive(true);
} else {
String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
System.out.println("节点删除:" + dataPath);
Lock lock = tryActiveLock(lockNode);
synchronized (lockNode) {
if (lock.isActive()) {
lockNode.notify(); // 释放了
}
}
zkClient.unsubscribeDataChanges(upNodePath, this);
}
});
}
return lockNode;
}
public Lock createLockNode(String lockId) {
String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
return new Lock(lockId, nodePath);
}
}
public class LockTest {
ZookeeperLock zookeeperLock;
static Long count = 0L;
@Before
public void init() {
zookeeperLock = new ZookeeperLock();
}
@Test
public void getLockTest() throws InterruptedException {
Lock lock = zookeeperLock.lock("luban", 60 * 1000);
System.out.println("成功获取锁");
Thread.sleep(Long.MAX_VALUE);
assert lock != null;
}
@Test
public void run() throws InterruptedException, IOException {
// 写数字 0+100 =100
File file = new File("d:/test.txt");
if (!file.exists()) {
file.createNewFile();
}
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {// 1000 个线程存在问题
executorService.submit(() -> {
Lock lock = zookeeperLock.lock(file.getPath(), 60 * 1000);
try {
String firstLine = Files.lines(file.toPath()).findFirst().orElse("0");
int count = Integer.parseInt(firstLine);
count++;
Files.write(file.toPath(), String.valueOf(count).getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
zookeeperLock.unlock(lock);
}
});
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
String firstLine = Files.lines(file.toPath()).findFirst().orElse("0");
System.out.println(firstLine);
}
}
七、zookeeper 源代码
工程结构介绍
项目地址:https://github.com/apache/zookeeper.git
l zookeeper-recipes: 示例源码
l zookeeper-client: C语言客户端
l zookeeper-server:主体源码
注意1:zookeeper对外网络通讯采用的是java nio技术,其中有自己实现的nio,也有netty实现的,默认为自己实现的nio,官网推荐netty。
注意2:zookeeper数据操作是操作内存数据库的,持久节点会存储在磁盘内,但每次初始化都是先把磁盘的快照与日志加载到内存来(快照与日志文件加载到dataTree),再进行操作。
- QuorumPeerMain#main //启动main方法
- - QuorumPeerConfig#parse // 加载zoo.cfg 文件
- - - QuorumPeerConfig#parseProperties // 解析配置
- - DatadirCleanupManager#start // 启动定时任务清除日志
- - QuorumPeerConfig#isDistributed // 判断是否为集群模式
- - - ServerCnxnFactory#createFactory() // 创建服务默认为NIO,推荐netty
- QuorumPeerMain#getQuorumPeer
- QuorumPeer#setTxnFactory
- new FileTxnSnapLog // 数据文件管理器,用于检测快照与日志文件
- - new ZKDatabase //初始化数据库
- - - ZKDatabase#createDataTree //创建数据树,所有的节点都会存储在这
- - QuorumPeer#start // 启动集群:同时启动线程
- - - QuorumPeer#loadDataBase // 从快照文件以及日志文件 加载节点并填充到dataTree中去
- - - QuorumPeer#startServerCnxnFactory // 启动netty 或java nio 服务,对外开放2181 端口
- - - AdminServer#start// 启动管理服务,netty http服务,http://ip:8080/commands/stat
- - - QuorumPeer#startLeaderElection // 开始执行选举流程
- - - quorumPeer.join() // 防止主进程退出
注意3:client与zookeeper server间的通信是通过netty/nio进行的,比如client的create操作或者server的返回值。
快照与事务日志存储结构
ZK中所有的数据都是存储在内存中,即zkDataBase中。但同时所有对ZK数据的变更都会记录到事物日志中,并且当写入到一定的次数就会进行一次快照的生成。已保证数据的备份。其后缀就是ZXID(唯一事物ID)。
- 事物日志:每次增删改,的记录日志都会保存在文件当中
- 快照日志:存储了在指定时间节点下的所有的数据
注意1:快照不会实时更新,但是事物日志会记录每次增删改的操作,事物日志在记录10万次后才会写入快照文件中,其中日志文件后缀就是ZXID(唯一事物ID)。数据的更新先内存后文件再进行返回。
读取快照日志:
org.apache.zookeeper.server.SnapshotFormatter
读取事物日志:
org.apache.zookeeper.server.LogFormatter
快照相关配置
dataLogDir | 事物日志目录 |
---|---|
zookeeper.preAllocSize | 预先开辟磁盘空间,用于后续写入事务日志,默认64M |
zookeeper.snapCount | 每进行snapCount次事务日志输出后,触发一次快照,默认是100,000 |
autopurge.snapRetainCount | 自动清除时 保留的快照数 |
autopurge.purgeInterval | 清除时间间隔,小时为单位 -1 表示不自动清除。 |
网友评论