使用zookeeper实现
使用zookeeper实现分布式下锁的抢占。
使用curator实现
添加maven依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
需要注意的是4.0.0版本依赖zk3.5.3-beta版本,如果运行的zk不是这个版本需要引入对应兼容的版本。
public class DistributedLockUtil {
// 集群地址
public static final String ZOOKEEPER_STRING = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
// 可重入锁实现类
private InterProcessMutex lock;
// 锁的路径
private String lockPath = "/locks";
public DistributedLockUtil(CuratorFramework client) {
this.lock = new InterProcessMutex(client, lockPath);
}
public InterProcessMutex getLock() {
return lock;
}
public static void main(String[] args) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZOOKEEPER_STRING,
10000,// session断开保留时长
10000,// 链接超时
new RetryForever(10000)// 重试机制
);
curatorFramework.start();
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
DistributedLockUtil distributedLockUtil = new DistributedLockUtil(curatorFramework);
InterProcessMutex lock = distributedLockUtil.getLock();
try {
lock.acquire();
System.out.println(Thread.currentThread().getName() + "-获得锁!");
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread:" + i).start();
countDownLatch.countDown();
}
}
}
curator
提供InterProcessMutex
实现分布式可重入锁的机制。
接下来解读下使用Zookeeper实现分布式锁的常见逻辑。
ZK的节点
zookeeper数据以树状节点的形式存储,并且相同目录下节点不糊会重复,其中节点类型分为
- 持久节点:客户端断开链接节点不会被删除
- 持久有序节点:客户端断开链接节点不会被删除,且创建的节点有顺序序号。
- 临时节点:客户端断开链接节点被删除
- 临时有序节点:客户端断开链接节点被删除,且创建的节点有顺序序号。
watcher机制
客户端注册监听它关心的目录节点,包括数据变更、删除、新增、子节点变化等,zookeeper会触发通知客户端。并且每个监听的watcher只会触发一次。
公平锁
基于ZK几点的特性,以及watcher机制,能够很方便的实现公平锁的竞争。如下:
客户端在父节点/locks下创建临时有序节点,获取/locks节点下所有子节点,判断自己是否为序号最小节点,如果是最小序号,则获得锁,如果不是,则监听比自己节点小一号的节点,添加删除的watcher事件。当获得锁的客户端释放锁或则断开链接,则会触发它的下一个节点的watcher事件,让下一个节点获得区锁。
由于watcher事件是一次性的,如果当前客户端的上一个节点,还没获得锁就已经断开链接,这时也会触发watcher事件,而这时并不是锁释放的型号,所以代码逻辑里需要在watcher事件处理里再次判断当前节点是否为最小节点,如果不是继续监听比自己小的上一个节点。这样子就形成了一个按客户端创建节点顺序排列的队列,实现锁的公平竞争。
惊群效应:
这里有一点需要注意下,如果我们所有为获得锁的客户端的watcher事件监听的是/locks节点的子节点的变化事件。那么如果这时候有上千个客户端在等待获得锁,那么一档持有锁的客户端释放锁,所有等待的客户端都会触发watcher事件,这显然是不合理的,这时并不是所有的客户端都需要重新发起锁的获取。这就是所谓的惊群效应。
我们可以跟踪下curator
的InterProcessMutex
实现原理,其逻辑就是公平锁的处理思想,主要逻辑在LockInternals#internalLockLoop
// 上一步已创建临时节点,ourPath
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获得有序的所有最节点信息
List<String> children = getSortedChildren();
// 当前客户端序号
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断是否为最小节点,是则获得锁,否则监听
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//监听比自己小的上一个节点的事件
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
// 等待,当watcher事件触发时唤醒,继续while判断
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
// 异常释放锁
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
代码比较建档,看下注释可以跟踪代码的实现
非公平锁
上线讲到的是公平锁竞争的实现,那么如何实现非公平锁的竞争呢。其实很简单,我们只需要稍微改变下思路即可。首先还是获得锁的节点还是临时节点,但是非公平锁只有一个节点/lock。所有客户端都尝试创建这个/lock节点,如果创建失败,即节点已经存在。则添加/lock节点的watcher事件,当触发watcher事件时,所有需要竞争锁的客户端,再次尝试创建节点。这样就是先了锁的非公平竞争。
网友评论