1.步骤
- 每个客户端往持久化节点/parent_lock下创建有序临时节点/parent_lock/lock_。创建成功后/parent_lock下面会有每个客户端对应的节点,如/parent_lock/lock_000000001
- 客户端取得/parent_lock下子节点,并进行排序,判断排在最前面的是否为自己。
- 如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑
- 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.
- 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。
- 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。
2.实例
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class ZKLockUtil {
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
//ZooKeeper配置信息
private ZooKeeper zkClient;
private String lockPath;
// 监控lockPath的前一个节点的watcher
private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath() + " 前锁释放");
System.out.println(event.getState() + " 前锁释放");
synchronized (this) {
notifyAll();
}
}
};
public ZKLockUtil() throws IOException {
zkClient = new ZooKeeper("123.57.251.156:2181,123.57.251.156:2182,123.57.251.156:2183", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("失去连接");
}
}
});
}
//获取锁的原语实现.
public void acquireLock() throws InterruptedException, KeeperException {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}
//创建锁的原语实现。在lock节点下创建该线程的锁节点
private void createLock() throws KeeperException, InterruptedException {
//如果根节点不存在,则创建根节点
Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建EPHEMERAL_SEQUENTIAL类型节点
String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
this.lockPath = lockPath;
}
private void attemptLock() throws KeeperException, InterruptedException {
// 获取Lock所有子节点,按照节点序号排序
List<String> lockPaths = null;
lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
Collections.sort(lockPaths);
for (String s :
lockPaths ) {
System.out.println("lockPaths: "+s);
}
System.out.println("当前最小节点" + lockPath.substring (LOCK_ROOT_PATH.length() + 1));
int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
// 如果lockPath是序号最小的节点,则获取锁
if (index == 0) {
System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
return;
} else {
// lockPath不是序号最小的节点,监控前一个节点
String preLockPath = lockPaths.get(index - 1);
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
// 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
if (stat == null) {
attemptLock();
} else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
System.out.println(" 等待前锁释放,prelocakPath:" + preLockPath);
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//释放锁的原语实现
public void releaseLock() throws KeeperException, InterruptedException {
zkClient.delete(lockPath, -1);
zkClient.close();
System.out.println(" 锁释放:" + lockPath);
}
}
public class TicketSeller {
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
TicketSeller ticketSeller = new TicketSeller();
for (int i = 0; i < 8; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// System.out.println("开始执行第"+i+"个");
try {
ticketSeller.sellTicketWithLock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("-------------------------");
}
}).start();
}
}
private void sell() {
System.out.println("售票开始");
// 线程随机休眠数毫秒,模拟现实中的费时操作
int sleepMillis = (int) (Math.random() * 2000);
try {
//代表复杂逻辑执行了一段时间
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票结束");
}
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
ZKLockUtil lock = new ZKLockUtil();
lock.acquireLock();
sell();
lock.releaseLock();
}
}
分布式锁 | 优点 | 缺点 |
---|---|---|
Zookeeper | 1.有封装好的框架,容易实现 2.有等待锁的队列,大大提升抢锁效率。 |
添加和删除节点性能较低 |
Redis | Set和Del指令性能较高 | 1.实现复杂,需要考虑超时,原子性,误删等情形。 2.没有等待锁的队列,只能在客户端自旋来等待,效率低下。 |
3.总结
其实如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。
大家都是上来直接创建一个锁节点下的一个接一个的临时顺序节点
如果自己不是第一个节点,就对自己上一个节点加监听器
只要上一个节点释放锁,自己就排到前面去了,相当于是一个排队机制。
而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。
网友评论