原理
zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁。(注意同目录的有序节点的序号是递增的)如图:
![](https://img.haomeiwen.com/i23281927/1de3e9588aa31b13.png)
curator框架实现(整合springboot):
PS:zooKeeper用的版本3.5.6,springboot2.3.11
POM:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
代码:
#配置
@Bean(name = "curatorFramework")
public CuratorFramework curatorFramework() {
/*
* 重试的间隔时间,毫秒
*/
int elapsedTimeMs = 5000;
/*
* 连接重试次数
*/
int retryCount = 3;
/*
* 连接超时时间,毫秒
*/
int connectionTimeoutMs = 5000;
/*
* session超时时间,毫秒
*/
int sessionTimeoutMs = 60000;
/*
* zookeeper连接地址
*/
String connectString = "192.168.6.59:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(elapsedTimeMs, retryCount);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
return curatorFramework;
}
#使用
@GetMapping("/zkLock/{id}")
public void getKeyWithCurator(@PathVariable Long id) {
InterProcessMutex rlock = null;
try {
Stat stat = curatorFramework.checkExists().forPath(PARENT_PATH);
if (stat == null) {
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH);
}
rlock = new InterProcessMutex(curatorFramework, PARENT_PATH + CHILD_PATH);
if (rlock.acquire(10L, TimeUnit.SECONDS)) {
Object value = redisTemplate.opsForValue().get("llc" + id.toString());
if (Objects.nonNull(value)) {
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------在缓存查询到id=" + id + "的值=" + value.toString());
return;
}
User user = userService.queryById(id);
if (Objects.nonNull(user)) {
redisTemplate.opsForValue().set("llc" + id.toString(), user, 30, TimeUnit.MINUTES);
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------在数据库查询到id=" + id + "的值=" + user);
return;
}
}
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------10s也拿不到锁");
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (rlock != null) {
rlock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
网友评论