package com.wanna.zk.zkstudy;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
/**
* 利用zk实现分布式锁思路:使用临时顺序节点
* 1.在执行加锁之前,先判断/LOCKS有没有被创建,如果没有则先创建
* 2.如果创建了/LOCKS,则创建/LOCKS/LOCK_xxxxxxxx这样子的临时顺序节点
* 3.尝试去加锁的逻辑,先获取/LOCKS/下所有的孩子节点,并进行排序
* ---3.1如果当前锁在的位置是0,也就是第一个元素,那么就获取锁成功,开始执行业务代码
* ---3.2如果当前锁所在的位置不是0,那么就获取它的前一个节点,并进行监控
* ------如果前一个节点已经被删了,那么就继续尝试去获取锁
* ------如果前一个节点还没被删,那么就阻塞当前线程,直到监听器(Watcher)对象将其唤醒才继续执行
* 4.解锁逻辑,删除自己创建的LOCK这条记录,因为是临时顺序节点,因此就算机器宕机了也会自动删除锁
*/
public class ZKLock {
private static final String LOCK_ROOT = "/LOCKS"; //锁的根路径
private static final String LOCK_NODE_NAME = "LOCK_"; //锁的名称,使用临时顺序节点
private String lockPath; //完整的锁路径
//集群的连接字符串,机器ip:port中间用逗号分隔即可
String connectString = "localhost:2181,localhost:2182,localhost:2183";
private ZooKeeper zooKeeper; //zk对象
private final Thread currentThread = Thread.currentThread(); //获取创建对象的线程,用来进行唤醒和阻塞
private int zkSessionTimeout = 5000; //sessionTimeout
//判断某个元素是否被删除,如果删除了就将当前线程唤醒
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//如果获取到删除节点的事件,那么就唤醒当前线程
if (event.getType() == Event.EventType.NodeDeleted) {
LockSupport.unpark(currentThread); //将当前线程唤醒
}
}
};
public ZKLock() {
try {
zooKeeper = new ZooKeeper(connectString, zkSessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
//如果链接建立成功,打印相关信息
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("ZK连接建立成功");
LockSupport.unpark(currentThread);
}
}
}
});
LockSupport.park(); //因为是异步去进行连接,因此这里需要等待
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() {
try {
createLock(); //createLock
attemptLock(); //尝试去获取锁
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
public void createLock() throws KeeperException, InterruptedException {
//判断LOCKS节点是否存在
final Stat exists = zooKeeper.exists(LOCK_ROOT, false);
//如果不存在则创建一个/LOCKS的持久节点
if (exists == null) {
zooKeeper.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建临时有序节点,节点为/LOCKS/LOCK_xxxxxxxx
lockPath = zooKeeper.create(LOCK_ROOT.concat("/").concat(LOCK_NODE_NAME), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("LOCK NODE ".concat(lockPath).concat(" has been created"));
}
//尝试去获取锁
public void attemptLock() throws KeeperException, InterruptedException {
//获取/LOCKS节点的孩子节点
final List<String> children = zooKeeper.getChildren(LOCK_ROOT, false);
//对children节点进行排序
Collections.sort(children);
//获取当前锁节点排序之后的下标,截取掉/LOCKS/之后的内容
final int index = children.indexOf(lockPath.substring(LOCK_ROOT.length() + 1));
//如果获取到的index为0,也就是第一个元素
if (index == 0) {
System.out.println("获取LOCK成功");
return;
}
//如果不是第一个元素,获取上一个节点的路径
final String s = children.get(index - 1);
//监视它的上一个元素的变化情况,传入watcher对象
final Stat exists = zooKeeper.exists(LOCK_ROOT.concat("/").concat(s), watcher);
//继续去获取锁
if (exists != null) { //如果exists不为null,那么就阻塞,不然就尝试去获取锁
LockSupport.park();
}
attemptLock();
}
public void unlock() {
//删除锁的临时有序节点,并且关闭连接对象
try {
zooKeeper.delete(lockPath, -1);
zooKeeper.close();
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
网友评论