一、 Zookeeper
项目地址:https://github.com/bao17634/zookeeper_lock.git
1.1 zookeeper简介:
- zookeeper是一个分布、开源的应用程序协调服务,是集群的管理者,监视着集群中各个节点的状态,根据节点的反馈进行下一步合理操作。主要解决的是分布式应用经常遇到的数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
-
zookeeper组成:
zookeeper组成图
1.2 zookeeper功能:
提供文件系统及通知机制。
1.2.1 文件系统:
zookeeper维护一个类似文件系统的数结构,如下图:
每子目录如NameService都被作为zknode,和文件一样,可以自由增加及删除,唯一不同的是他可以用来存储数据。zknode分为四种类型:
- PERSISTENT: 持久化目录节点。(客户端与zookeeper断开连接后,该节点依旧存在)。
- PERSISTENT_SEQUENTIAL: 持久化顺序编号目录节点。(客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号)
- EPHEMERAL: 临时目录节点(客户端与zookeeper断开连接后,该节点被删除)
- EPHEMERAL_SEQUENTIAL: 临时顺序编号目录节点。(客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号)
1.2.2 通知机制: 客户端注册监听他关心的目录节点,当目录节点发生变化(数据改变、被删除、目录节点增加删除)时,zookeeper会通知客户端。
集群架构如图:
二、 zookeeper集群
2.1 zookeeper集群搭建教程: https://www.jianshu.com/p/a5fda39f20d0
2.2 集群简介:
为了保证高可用,zookeeper需要以集群形态来部署,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么zookeeper本身仍然是可用的。客户端在使用zookeeper时,需要知道集群机器列表,通过与集群中的某一台机器建立TCP连接来使用服务,客户端使用这个TCP链接来发送请求、获取结果、获取监听事件以及发送心跳包。如果这个连接异常断开了,客户端可以连接到另外的机器上。这样客户端的读请求就可以被任意一台机器处理,如果请求在节点上注册了监听器,这个监听器也是由所连接的zookeeper机器来处理。
集群架构如下图:
说明: 对于写请求,这些请求会同时发给其他zookeeper机器并且达成一致后,请求才会返回成功,因此随着zookeeper的集群增多,读请求的吞吐量提高但是写请求的吞吐量会下降。
2.3 zookeeper分布式锁实现
2.3.1 实现流程:
-
首先在zookeeper创建根节点,也就是持久节点(rootNode)。
-
在根节点(rootNode)下创建临时节点(node_n):
1) 如果当前创建的节点的序号是最小话,就认定该客户端已经获得锁。
2)如果不是最小的节点,说明获取锁失败,此时客户端就要找比自己小的节点,对其注册事件监听器。
-
获取锁的客户端在执行完业务就删除当前最小节点。
-
删除最小节点后客户端会通过监听器收到通知,然后再次判断是否自己的节点最小,是的话进行加锁,不是的话重复上述操作。
-
加锁流程图:
2.3.2 分布式锁代码实现(利用curator实现):
1)引入依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
2)编写配置类:
@Configuration
public class CuratorConfiguration {
@Autowired
ConfigProperties configProperties;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(configProperties.getElapsedTimeMs(), configProperties.getRetryCount());
return CuratorFrameworkFactory.newClient(
configProperties.getConnectString(),
configProperties.getSessionTimeoutMs(),
configProperties.getConnectionTimeoutMs(),
retryPolicy);
}
}
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
创建zk连接,该方法要求传入的三个参数分别是:ip:端口、会话超时时间、连接超时时间、重试策略(由public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
方法提供,其要求传入的参数粉分别是: 重试间隔时间、重试次数)
4)curator的分布式锁:
curator提供了四种分布式锁,分别是:
image- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
1> 分布式可重入排它锁实现:
代码:
public ApiResult processMutex() throws Exception {
log.info("第{}个线程请求锁", ++count);
try {
// 创建分布式可重入排他锁,根节点为ROOT_LOCK_NODE
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, ROOT_LOCK_NODE );
//加锁
if (mutex.acquire(TIME_OUT, TimeUnit.SECONDS)) {
try {
log.info("获得锁线程为:{}", ++number);
return ApiResult.ok("加锁成功");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
log.info("解锁");
//释放锁
mutex.release();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
throw new RuntimeException("拿到锁失败");
}
2> 分布式排它锁实现:
代码:
public ApiResult semaphoreMutex() throws Exception {
log.info("第{}个线程请求锁", ++count);
try {
// 创建分布式排他锁,根节点为ROOT_LOCK_NODE
InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curatorFramework, ROOT_LOCK_NODE);
//加锁
if (mutex.acquire(TIME_OUT, TimeUnit.SECONDS)) {
try {
log.info("获得锁线程为:{}", ++number);
return ApiResult.ok("加锁成功");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//解锁
mutex.release();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
throw new RuntimeException("拿到锁失败");
}
2> 分布式读写锁实现:
代码:
public ApiResult readWriteLock() throws Exception {
log.info("第{}个线程请求锁", ++count);
try {
// 创建分布式读写锁,根节点为ROOT_LOCK_NODE
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_LOCK_NODE);
//创建读锁
InterProcessMutex readLock = readWriteLock.readLock();
//创建写锁
InterProcessMutex writeLock = readWriteLock.writeLock();
try {
//注意只有先得到写锁在得到读锁,不能反过来
if (!writeLock.acquire(TIME_OUT, TimeUnit.SECONDS)) {
throw new RuntimeException("得到写锁失败");
}
log.info("已经得到写锁");
if (!readLock.acquire(TIME_OUT, TimeUnit.SECONDS)) {
throw new RuntimeException("得到读锁失败");
}
log.info("已经得到读锁");
log.info("获得锁线程数为:{}", ++number);
return ApiResult.ok("加锁成功");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//解锁
writeLock.release();
readLock.release();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
三、分布式锁测试情况:
3.1 测试环境:
- 系统:centoOs
- 软件:jemeter
- 测试压力:1 x 10000、100 x 100、200 x 100、500 x 100
- 说明:测试压力=线程数 x 每个线程循环数
3.2 可重入排它锁:
1 x 10000压力测试 100 x 100压力测试 200 x 100压力测试 500 x 100压力测试3.3 排他锁:
1 x 10000压力测试 100 x 100压力测试 200 x 100压力测试 500 x 100压力测试3.4 读写锁:
1 x 10000 压力测试 100 x 100 测试压力 200 x 100 测试压力 500 x 100 测试压力四、总结:
本文主要介绍了使用curator来实现zookeeper分布式锁,在研究zookeeper过程中我也写了利用zookeeper原生API来实现分布式锁,但是实现过程比较复杂,在测试的时候单线程虽然性能与上面三种性能差不多,但是在多线程测试中性能就非常差,所以这里就没有重点去介绍,具体实现请移步我的github。
测试结果分析(主要考虑多线程):
- 性能指数: 可重入排它锁(2.992) > 排它锁(2.98) > 读写锁(2.974)
- 执行时间: 可重入排它锁(995) < 读写锁(1638.93) < 排它锁(1477)
- 请求失败率 可重入排它锁(0)=排它锁(0)< 读写锁(0.28%)
- 综合分析: 从上面的测试结果来看三者个方案在低并发下性能差距不大,但是从高并发测试结果来看可重入排它锁很明显优于其他两种方案,所以从整体上性能上看:可重入排它锁 > 排它锁 > 读写锁
网友评论