点赞再看,养成习惯,微信搜一搜【一角钱技术】关注更多原创技术文章。
本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。
前言
前两篇讲了Zookeeper的特性、客户端使用和集群原理,因为 Zookeeper 是分布式系统中很常见的一个基础系统。 而且问的话常问的就是说 zookeeper 的使用场景是什么? 看你知道不知道一些基本的使用场景。 但是其实 Zookeeper 挖深了自然是可以问的很深很深的。本文主要来聊聊 Zookeeper 主要的几个使用场景。
- 分布式集群管理
- 分布式注册中心
- 分布式JOB
- 分布式锁
分布式集群管理
分布式集群管理的需求
- 主动查看线上服务节点
- 查看服务节点资源使用情况
- 服务离线通知
- 服务资源(CPU、内存、硬盘)超出阀值通知
架构设计
节点结构
- niuh-manger // 根节点
- server00001 : //服务节点 1
- server00002 ://服务节点 2
- server........n ://服务节点 n
服务状态信息
- ip
- cpu
- memory
- disk
功能实现
数据生成与上报
- 创建临时节点:
- 定时变更节点状态信息:
主动查询
- 实时查询 zookeeper 获取集群节点的状态信息。
被动通知
- 监听根节点下子节点的变化情况,如果CPU 等硬件资源低于警告位则发出警报。
关键示例代码
package com.niuh.os;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.ZkClient;
import java.lang.instrument.Instrumentation;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class Agent {
private static Agent ourInstance = new Agent();
private String server = "127.0.0.1:2181";
private ZkClient zkClient;
private static final String rootPath = "/niuh-manger";
private static final String servicePath = rootPath + "/service";
private String nodePath; ///niuh-manger/service0000001 当前节点路径
private Thread stateThread;
public static Agent getInstance() {
return ourInstance;
}
private Agent() {
}
// javaagent 数据监控
public static void premain(String args, Instrumentation instrumentation) {
Agent.getInstance().init();
}
public void init() {
zkClient = new ZkClient(server, 5000, 10000);
System.out.println("zk连接成功" + server);
// 创建根节点
buildRoot();
// 创建临时节点
createServerNode();
// 启动更新的线程
stateThread = new Thread(() -> {
while (true) {
updateServerNode();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "zk_stateThread");
stateThread.setDaemon(true);
stateThread.start();
}
// 数据写到 当前的临时节点中去
public void updateServerNode() {
zkClient.writeData(nodePath, getOsInfo());
}
// 生成服务节点
public void createServerNode() {
nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
System.out.println("创建节点:" + nodePath);
}
// 更新服务节点状态
public String getOsInfo() {
OsBean bean = new OsBean();
bean.lastUpdateTime = System.currentTimeMillis();
bean.ip = getLocalIp();
bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
bean.pid = ManagementFactory.getRuntimeMXBean().getName();
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(bean);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static String getLocalIp() {
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
return addr.getHostAddress();
}
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
}
实现效果
启动参数设置
运行测试用例:
package com.niuh.test;
import com.niuh.os.Agent;
import org.junit.Ignore;
import org.junit.Test;
public class AgentTest {
@Test
@Ignore
public void initTest() {
Agent.premain(null, null);
runCPU(2); //20% 占用
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//
private void runCPU(int count) {
for (int i = 0; i < count; i++) {
new Thread(() -> {
while (true) {
long bac = 1000000;
bac = bac >> 1;
}
}).start();
;
}
}
}
控制台输出:
CPU 报警...22.55120088850181
CPU 报警...46.06592086097357
CPU 报警...47.87206766163349
CPU 报警...49.49176420213768
CPU 报警...48.967942479969004
CPU 报警...49.193921607021565
CPU 报警...48.806604284784676
CPU 报警...48.63229912951865
CPU 报警...49.34509647972038
CPU 报警...47.07551108884401
CPU 报警...49.18489236134496
CPU 报警...49.903007346777066
CPU 报警...49.28868795953268
// 关闭测试用例
服务已下线:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}
本Demo不适用在生产环境,示例Demo涉及组件zookeeper-agent、zookeeper-web。源代码提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式注册中心
在单体式服务中,通常是由多个客户端去调用一个服务,只要在客户端中配置唯一服务节点地址即可,当升级到分布式后,服务节点变多,像一线大厂服务节点更是上万之多,这么多节点不可能手动配置在客户端,这里就需要一个中间服务,专门用于帮助客户端发现服务节点,即许多技术书籍经常提到的服务发现。
一个完整的注册中心涵盖以下功能特性:
- 服务注册:提供者上线时将自提供的服务提交给注册中心。
- 服务注销:通知注册心提供者下线。
- 服务订阅:动态实时接收服务变更消息。
- 可靠:注册服务本身是集群的,数据冗余存储。避免单点故障,及数据丢失。
- 容错:当服务提供者出现宕机,断电等极情况时,注册中心能够动态感知并通知客户端服务提供者的状态。
Dubbo 对 Zookeeper的使用
阿里著名的开源项目Dubbo 是一个基于JAVA的RCP框架,其中必不可少的注册中心可基于多种第三方组件实现,但其官方推荐的还是Zookeeper做为注册中心服务。
Dubbo Zookeeper注册中心存储结构
节点说明
类别 | 属性 | 说明 |
---|---|---|
Root | 持久节点 | 根节点名称,默认是 "dubbo" |
Service | 持久节点 | 服务名称,完整的服务类名 |
type | 持久节点 | 可选值:providers(提供者)、consumers(消费者)、configurators(动态配置)、routers |
URL | 临时节点 | url名称 包含服务提供者的 IP 端口 及配置等信息。 |
流程说明
- 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
- 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
- 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
示例Demo
服务端代码
package com.niuh.zk.dubbo;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.alibaba.dubbo.config.ServiceConfig;
import java.io.IOException;
public class Server {
public void openServer(int port) {
// 构建应用
ApplicationConfig config = new ApplicationConfig();
config.setName("simple-app");
// 通信协议
ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
protocolConfig.setThreads(200);
ServiceConfig<UserService> serviceConfig = new ServiceConfig();
serviceConfig.setApplication(config);
serviceConfig.setProtocol(protocolConfig);
serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
serviceConfig.setInterface(UserService.class);
UserServiceImpl ref = new UserServiceImpl();
serviceConfig.setRef(ref);
//开始提供服务 开张做生意
serviceConfig.export();
System.out.println("服务已开启!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
}
public static void main(String[] args) throws IOException {
new Server().openServer(-1);
System.in.read();
}
}
客户端代码
package com.niuh.zk.dubbo;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import java.io.IOException;
public class Client {
UserService service;
// URL 远程服务的调用地址
public UserService buildService(String url) {
ApplicationConfig config = new ApplicationConfig("young-app");
// 构建一个引用对象
ReferenceConfig<UserService> referenceConfig = new ReferenceConfig<UserService>();
referenceConfig.setApplication(config);
referenceConfig.setInterface(UserService.class);
// referenceConfig.setUrl(url);
referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
referenceConfig.setTimeout(5000);
// 透明化
this.service = referenceConfig.get();
return service;
}
static int i = 0;
public static void main(String[] args) throws IOException {
Client client1 = new Client();
client1.buildService("");
String cmd;
while (!(cmd = read()).equals("exit")) {
UserVo u = client1.service.getUser(Integer.parseInt(cmd));
System.out.println(u);
}
}
private static String read() throws IOException {
byte[] b = new byte[1024];
int size = System.in.read(b);
return new String(b, 0, size).trim();
}
}
查询 zk 实际存储内容:
/dubbo
/dubbo/com.niuh.zk.dubbo.UserService
/dubbo/com.niuh.zk.dubbo.UserService/configurators
/dubbo/com.niuh.zk.dubbo.UserService/routers
/dubbo/com.niuh.zk.dubbo.UserService/providers
/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881
/dubbo/com.niuh.zk.dubbo.UserService/consumers
/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549
示例Demo涉及组件zookeeper-dubbo。源代码提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式JOB
分布式JOB需求
- 多个服务节点只允许其中一个主节点运行JOB任务。
- 当主节点挂掉后能自动切换主节点,继续执行JOB任务。
架构设计
node结构
- niuh-master
- server0001:master
- server0002:slave
- server000n:slave
选举流程
服务启动:
- 在niuh-maste下创建server子节点,值为slave
- 获取所有niuh-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置自己为master节点
子节点删除事件触发:
- 获取所有niuh-master 下所有子节点
- 判断是否存在master 节点
- 如果没有设置最小值序号为master 节点
示例Demo
package com.niuh.zookeeper.master;
import org.I0Itec.zkclient.ZkClient;
import java.util.Map;
import java.util.stream.Collectors;
public class MasterResolve {
private String server = "127.0.0.1:2181";
private ZkClient zkClient;
private static final String rootPath = "/niuh-master";
private static final String servicePath = rootPath + "/service";
private String nodePath;
private volatile boolean master = false;
private static MasterResolve resolve;
private MasterResolve() {
zkClient = new ZkClient(server, 2000, 5000);
buildRoot();
createServerNode();
}
public static MasterResolve getInstance() {
if (resolve == null) {
resolve= new MasterResolve();
}
return resolve;
}
// 构建根节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
// 创建server节点
public void createServerNode() {
nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
System.out.println("创建service节点:" + nodePath);
initMaster();
initListener();
}
private void initMaster() {
boolean existMaster = zkClient.getChildren(rootPath)
.stream()
.map(p -> rootPath + "/" + p)
.map(p -> zkClient.readData(p))
.anyMatch(d -> "master".equals(d));
if (!existMaster) {
doElection();
System.out.println("当前当选master");
}
}
private void initListener() {
zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
doElection();// 执行选举
});
}
// 执行选举
public void doElection() {
Map<String, Object> childData = zkClient.getChildren(rootPath)
.stream()
.map(p -> rootPath + "/" + p)
.collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
if (childData.containsValue("master")) {
return;
}
childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
if (p.equals(nodePath)) { // 设置最小值序号为master 节点
zkClient.writeData(nodePath, "master");
master = true;
System.out.println("当前当选master" + nodePath);
}
});
}
public static boolean isMaster() {
return getInstance().master;
}
}
示例Demo涉及组件zookeeper-master。源代码提交在 github。
分布式锁
锁的的基本概念
开发中锁的概念并不陌生,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应用中我们可以通过 synchronized
或 ReentrantLock
来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。比如一些简单的做法是使用关系型数据行级锁来实现不同进程之间的互斥,但大型分布式系统的性能瓶颈往往集中在数据库操作上。为了提高性能得采用如Redis、Zookeeper之内的组件实现分布式锁。
共享锁:也称作只读锁,当一方获得共享锁之后,其它方也可以获得共享锁。但其只允许读取。在共享锁全部释放之前,其它方不能获得写锁。
排它锁:也称作读写锁,获得排它锁后,可以进行数据的读写。在其释放之前,其它方不能获得任何锁。
锁的获取
某银行帐户,可以同时进行帐户信息的读取,但读取其间不能修改帐户数据。其帐户ID为:888
获得读锁流程
- 基于资源ID创建临时序号读锁节点
/lock/888.R0000000002 Read
- 获取 /lock 下所有子节点,判断其最小的节点是否为读锁,如果是则获锁成功
- 最小节点不是读锁,则阻塞等待。添加lock/ 子节点变更监听。
- 当节点变更监听触发,执行第2步
数据结构
获得写锁
- 基于资源ID创建临时序号写锁节点
/lock/888.R0000000002 Write
- 获取 /lock 下所有子节点,判断其最小的节点是否为自己,如果是则获锁成功
- 最小节点不是自己,则阻塞等待。添加lock/ 子节点变更监听。
- 当节点变更监听触发,执行第2步
释放锁
读取完毕后,手动删除临时节点,如果获锁期间宕机,则会在会话失效后自动删除。
关于羊群效应
在等待锁获得期间,所有等待节点都在监听 Lock节点,一但lock 节点变更所有等待节点都会被触发,然后在同时反查Lock 子节点。如果等待对例过大会使用Zookeeper承受非常大的流量压力。
为了改善这种情况,可以采用监听链表的方式,每个等待对列只监听前一个节点,如果前一个节点释放锁的时候,才会被触发通知。这样就形成了一个监听链表。
示例Demo
package com.niuh.zookeeper.lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
import java.util.stream.Collectors;
public class ZookeeperLock {
private String server = "127.0.0.1:2181";
private ZkClient zkClient;
private static final String rootPath = "/niuh-lock1";
public ZookeeperLock() {
zkClient = new ZkClient(server, 5000, 20000);
buildRoot();
}
// 构建根节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
// 获取锁
public Lock lock(String lockId, long timeout) {
// 创建临时节点
Lock lockNode = createLockNode(lockId);
lockNode = tryActiveLock(lockNode);// 尝试激活锁
if (!lockNode.isActive()) {
try {
synchronized (lockNode) {
lockNode.wait(timeout); // 线程锁住
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (!lockNode.isActive()) {
throw new RuntimeException(" lock timeout");
}
return lockNode;
}
// 释放锁
public void unlock(Lock lock) {
if (lock.isActive()) {
zkClient.delete(lock.getPath());
}
}
// 尝试激活锁
private Lock tryActiveLock(Lock lockNode) {
// 获取根节点下面所有的子节点
List<String> list = zkClient.getChildren(rootPath)
.stream()
.sorted()
.map(p -> rootPath + "/" + p)
.collect(Collectors.toList()); // 判断当前是否为最小节点
String firstNodePath = list.get(0);
// 最小节点是不是当前节点
if (firstNodePath.equals(lockNode.getPath())) {
lockNode.setActive(true);
} else {
String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
System.out.println("节点删除:" + dataPath);
Lock lock = tryActiveLock(lockNode);
synchronized (lockNode) {
if (lock.isActive()) {
lockNode.notify(); // 释放了
}
}
zkClient.unsubscribeDataChanges(upNodePath, this);
}
});
}
return lockNode;
}
public Lock createLockNode(String lockId) {
String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
return new Lock(lockId, nodePath);
}
}
示例Demo涉及组件zookeeper-lock。源代码提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
部分图片来源于网络,版权归原作者,侵删。
网友评论