1.Curator 介绍
-
摘录官网的介绍
Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
2.依赖
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
3.配置文件
- application.properties 中添加如下配置
curator.retryCount=5 #重试次数
curator.elapsedTimeMs=5000 #重试间隔时间
curator.connectString=127.0.0.1:2181 # zookeeper 地址
curator.sessionTimeoutMs=60000 # session超时时间
curator.connectionTimeoutMs=5000 # 连接超时时间
4.配置类编写
- 1)配置类
package com.zxr.micro.config.zookeeper;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZk {
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
}
- 2)配置中心
package com.zxr.micro.config.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfiguration {
@Autowired
WrapperZk wrapperZk;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework() {
return CuratorFrameworkFactory.newClient(
wrapperZk.getConnectString(),
wrapperZk.getSessionTimeoutMs(),
wrapperZk.getConnectionTimeoutMs(),
new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
}
}
5.分布式锁实现
package com.zxr.micro.config.zookeeper;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Service
public class DistributedLockByZookeeper implements InitializingBean{
private final static String ROOT_PATH_LOCK = "rootlock";
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private CuratorFramework curatorFramework;
/**
* 获取分布式锁
*/
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
log.info("success to acquire lock for path:{}", keyPath);
break;
} catch (Exception e) {
log.info("failed to acquire lock for path:{}", keyPath);
log.info("while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
/**
* 释放分布式锁
*/
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
log.error("failed to release lock");
return false;
}
return true;
}
/**
* 创建 watcher 事件
*/
private void addWatcher(String path) throws Exception {
String keyPath;
if (path.equals(ROOT_PATH_LOCK)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
}
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
log.info("上一个节点 "+ oldPath + " 已经被断开");
if (oldPath.contains(path)) {
//释放计数器,让当前的请求获取锁
countDownLatch.countDown();
}
}
});
}
//创建父节点,并创建永久节点
@Override
public void afterPropertiesSet() {
curatorFramework = curatorFramework.usingNamespace("lock-namespace");
String path = "/" + ROOT_PATH_LOCK;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
addWatcher(ROOT_PATH_LOCK);
log.info("root path 的 watcher 事件创建成功");
} catch (Exception e) {
log.error("connect zookeeper fail,please check the log >> {}", e.getMessage(), e);
}
}
}
6.测试
- 弄两个 GET 请求测一下,看看效果
package com.zxr.micro.controller;
import com.zxr.micro.common.RespMsg;
import com.zxr.micro.config.zookeeper.DistributedLockByZookeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/hello")
public class MicroController {
@Autowired
private DistributedLockByZookeeper distributedLockByZookeeper;
private final static String PATH = "test";
@GetMapping("/lock1")
public RespMsg<Boolean> getLock1() {
Boolean flag;
distributedLockByZookeeper.acquireDistributedLock(PATH);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
}
flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
return RespMsg.success(flag);
}
@GetMapping("/lock2")
public RespMsg getLock2() {
Boolean flag;
distributedLockByZookeeper.acquireDistributedLock(PATH);
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
}
flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
return RespMsg.success(flag);
}
}
7.注意
- 这个 DistributedLockByZookeeper 中的 afterPropertiesSet 可以使用@Bean的initMethod 初始化创建
网友评论