描述
zookeeper实现分布式锁原理:多个客户端去节点下分别创建临时顺序的节点。按顺序最小的节点对应的客户端获取锁。没有获取锁的客户端对前一个节点增加监听节点删除事件。释放锁时将节点删除,此时服务器发送监听事件通知客户端去获取锁。
思考
1.分布式锁包含本地锁和共享资源锁。例如一个客户端多个线程去获取分布式锁,应该先用本地锁控制,在放一个线程去获取共享资源锁。
2.zookeeper移除watch,客户端通过removeWatches()
实现。入参需要指定WatcherType。
Watcher.WatcherType:Children,Data,Any
通过getChildren()添加的watch,使用WatcherType是Children
通过exists()/getData()添加的watch,使用WatcherType是Data
代码实现
下面的分布式锁,没有本地锁。
public class ZkDistributedLock {
private static final String lockPrefix = "/update";
private static final int retryTime = 3;
private ZooKeeper zooKeeper;
private String znodeRoot;
private AtomicReference<String> znodeLock = new AtomicReference<>(null);
public ZkDistributedLock(ZooKeeper zooKeeper, String znodeRoot) {
this.znodeRoot = znodeRoot;
this.zooKeeper = zooKeeper;
try {
if (zooKeeper.exists(this.znodeRoot, false) == null) {
zooKeeper.create(znodeRoot, znodeRoot.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
public void lock() throws Exception {
String myLock = zooKeeper.create(znodeRoot + lockPrefix, lockPrefix.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
if (tryAcquire(myLock) != null) {
System.out.println(Thread.currentThread().getName() + " lock " + myLock);
}
}
public String tryAcquire(String myZnode) throws Exception {
while (true) {
List<String> children = zooKeeper.getChildren(znodeRoot, false);
Collections.sort(children);
String myLockString = myZnode.substring(8);
if (myLockString.equals(children.get(0))) {
znodeLock.set(myZnode);
break;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
int i = children.indexOf(myLockString);
Stat exists = zooKeeper.exists(znodeRoot + "/" + children.get(i - 1), event -> {
if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
countDownLatch.countDown();
}
});
// 如果watch的节点已经删除,则移除watch和countDownLatch,继续获取锁
if (exists == null) {
/**
* 移除watch
* Watcher.WatcherType:Children,Data,Any
* 通过getChildren()添加的watch,使用WatcherType是Children
* 通过exists()/getData()添加的watch,使用WatcherType是Data
*
* https://colobu.com/2014/09/28/ZooKeeper-Programmers-Guide/#Remove_Watches
*/
zooKeeper.removeAllWatches(znodeRoot + "/" + children.get(i - 1), Watcher.WatcherType.Data, true);
countDownLatch.countDown();
}
System.out.println(Thread.currentThread().getName() + "_znode:" + myZnode + "_wait:" + children.get(i - 1));
countDownLatch.await();
}
}
return znodeLock.get();
}
public void unLock() {
int actualTime = 0;
do {
try {
zooKeeper.delete(znodeLock.get(), -1);
System.out.println(Thread.currentThread().getName() + " unlock " + znodeLock.get());
znodeLock.set(null);
break;
} catch (Exception e) {
if (++actualTime == retryTime) {
e.printStackTrace();
}
}
} while (actualTime < retryTime);
}
}
测试
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZkCli zkCli = new ZkCli(connectString);
ZooKeeper zooKeeper = zkCli.getZooKeeper();
ZkDistributedLock lock = new ZkDistributedLock(zooKeeper, "/myLock");
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
try {
lock.lock();
// Thread.sleep(1000);
System.out.println("do some thing");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unLock();
}
});
}
pool.shutdown();
}
引用
http://www.dengshenyu.com/java/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/10/23/zookeeper-distributed-lock.html
https://colobu.com/2014/09/28/ZooKeeper-Programmers-Guide/#Remove_Watches
https://juejin.im/post/5bbb0d8df265da0abd3533a5
https://www.cnblogs.com/cc11001100/p/10269494.html
网友评论