利用Zookeeper来实现分布式锁,主要基于其临时(或临时有序)节点和watch机制.
为什么是临时节点?
临时节点的特性,在连接断开的情况下节点能被删除,所以即使客户端发生故障锁也能被释放,避免死锁的发生.
为什么是有序节点?
当然不用有序节点也是可以实现的.每个客户端尝试创建同一个临时节点,创建者获得锁,创建失败的客户端监听这个锁节点.
但是当客户端太多的时候,会形成羊群效应,因为只有一个客户端能获取锁,其他客户端都因失败而需要监听这个锁节点的删除事件,当获得锁的客户端完成业务后释放锁即删除这个锁节点时,zk要给所有监视的客户端发送通知,这样大量的消息通知可能会造成ZK的阻塞.
在这种场景下,更优化的方式是使用有序节点.
每个未获得锁的客户端只需要监听排在他前面的那个节点,每次节点删除也只需要通知一个客户端即可.
Curator(Zookeeper的Java客户端)就是用的临时有序节点和watch机制来实现分布式锁的.
步骤如下(这里本来想画一张图的,但画图能力有限,还没有画出满意的图来)
- 每个客户端基于节点/mylock创建临时有序子节点/mylock/lock-,比如第一个创建的/mylock/lock-0000000000,第二个/mylock/lock-0000000001......
- 客户端获取/mylock节点的子节点列表并按升序排序,判断自己创建的节点是否排在第一个.如果排在第一个则表示获得锁,否则监听前一个节点的删除事件.
- 获得锁的客户端进行业务处理.完成后删除子节点,释放锁.监听该子节点的客户端收到通知,尝试获取锁.
针对上述步骤考虑几个场景
场景1. 比如当前获得锁的节点是/mylock/lock-0000000000,而节点/mylock/lock-0000000001还没有对/mylock/lock-0000000000设置好监听事件的时候/mylock/lock-0000000000节点删除了
/mylock/lock-0000000001对应的客户端对/mylock/lock-0000000000设置监听的时候,如果该节点删除了会抛出一个NoNodeException异常;这个时候可以生吞这个异常重新尝试获取锁.
场景2. 比如当前获得锁的节点是/mylock/lock-0000000000,而节点/mylock/lock-0000000005对应的客户端突然宕机了,该节点被删除;
这个时候创建/mylock/lock-0000000006节点的客户端会收到节点删除的通知,然后尝试获取锁,发现自己获取不到锁,则监听/mylock/lock-0000000004子节点的删除事件.
编码实现分布式锁
看到这才是实现分布式锁的正确姿势!这篇文章才知道原来Spring早就为我们提供了分布式锁的实现了.不过其实也是用的Curator来实现的啦.看下依赖关系就知道啦.
重点看下类org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry
Spring封装之后使用也非常简单,大概步骤就是这样:
Lock lock = zookeeperLockRegistry.obtain("lock-xh");
//tryLock()zk内部实现的是一个超时接口
if (lock.tryLock()) {
//业务逻辑
lock.unlock();
}
我这里写了一个Demo,代码在github上 sb-learn-distributedlock-zk
模拟2个线程争抢这个锁lock-xh
,让线程B先启动,线程A休眠一段时间再启动.然后可以得到线程B抢到了锁.
通过zkCli.sh
查看锁节点,发现在/SpringIntegration-LockRegistry/lock-xh
节点下创建了2个临时有序节点.
[zk: 10.45.82.76(CONNECTED) 2] ls /SpringIntegration-LockRegistry/lock-xh
[_c_413a0764-4abe-4476-b241-a33f9a4af228-lock-0000000009, _c_11e2b7fb-ca26-4a4c-8832-b3d8e8b741de-lock-0000000008]
其实从这里也能看出来,他是利用了zk的临时有序节点来实现的.两个线程都到这个锁节点下创建子节点.然后按照顺序谁排前面谁就获得了锁.
注意
我开始编码的时候,spring-boot-starter-parent
用了2.2.0.BUILD-SNAPSHOT
版本,其依赖的curator
为4.0.1
版本,而我连的zk版本是3.4.11
.所以我测试的时候报错了:
Caused by: org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /SpringIntegration-LockRegistry/lock-xh/_c_48572564-d1d3-4134-9491-b359d756acc2-lock-
at org.apache.zookeeper.KeeperException.create(KeeperException.java:103)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1525)
at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1181)
at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1158)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:1155)
at org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:605)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:595)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:573)
at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:49)
at org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver.createsTheLock(StandardLockInternalsDriver.java:54)
at org.apache.curator.framework.recipes.locks.LockInternals.attemptLock(LockInternals.java:225)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.internalLock(InterProcessMutex.java:237)
at org.apache.curator.framework.recipes.locks.InterProcessMutex.acquire(InterProcessMutex.java:108)
at org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry$ZkLock.tryLock(ZookeeperLockRegistry.java:300)
... 3 more
是的!要注意服务端zk和客户端Curator
版本的兼容性,具体请看这里ZooKeeper Version Compatibility
关于获取锁的逻辑,重点看下Curator
的org.apache.curator.framework.recipes.locks.LockInternals
,我这里用的Curator版本4.0.1
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//省略其他代码
//创建临时有序节点 如果父节点没有也同时创建
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//阻塞直到获得锁,或者等待时间过了退出或者线程中断退出
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
//省略其他代码
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if(this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
//不断循环尝试获得锁
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//获取节点下的所有子节点并排序
List<String> children = this.getSortedChildren();
//ourPath如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
//basePath如/SpringIntegration-LockRegistry/lock-xh
//得到当前线程创建的有序节点名称 比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//检查当前节点是否排在第一个,如果排在第一个则获得锁,如果没有获得锁,则寻找需要监视的节点(即有序节点列表中排在当前节点前面的那个节点)
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
//获得锁啦
if(predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
//未获得锁,需要对前一个节点进行监视
//得到前一个有序节点的qu路径
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
//设置监视器; 这里有一种场景,即设置监视器的时候可能上一个节点已经被删除了.对于这种情况,会抛出NoNodeException异常;
//下面直接生吞了这种异常.继续循环尝试获得锁.
//这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,
//而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于zookeeper来说属于资源泄露
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if(millisToWait == null) {
this.wait();
} else {
millisToWait = Long.valueOf(millisToWait.longValue() - (System.currentTimeMillis() - startMillis));
startMillis = System.currentTimeMillis();
if(millisToWait.longValue() > 0L) {
this.wait(millisToWait.longValue());
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
;
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if(doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
注意
上面关于锁节点,可能会有点迷糊,为什么中间会有一串随机数?
org.apache.curator.framework.imps.CreateBuilderImpl#adjustPath
比如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
是在/SpringIntegration-LockRegistry/lock-xh/lock-的基础上调整而来的,分成路径/SpringIntegration-LockRegistry/lock-xh和节点lock-
然后节点lock-前面拼接上"_c_" + protectedId + "-"
比如这里的protectedId=35957bd7-a9e9-4f6f-a9f3-c131b9c3734c
最后拼成的节点全路径即:/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-
最后再创建有序节点的时候尾巴上补上了有序序列号0000000000
那么我们现在知道了这个节点的名称的创建逻辑,那么既然这是串随机数,我们怎么能保证先创建的节点就能排在前面呢?
其实原因就在上面源码中getSortedChildren方法,里面排序的时候并不是按照整个节点名称比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000来排序的,而是按照后面的有序序列号比如0000000000来排序的!
//获取节点下的所有子节点并排序
List<String> children = this.getSortedChildren();
关于zk实现分布式锁的学习资料
7 张图讲清楚ZooKeeper分布式锁实现原理
这才是实现分布式锁的正确姿势!
网友评论