InterProcessSemaphoreV2是Curator提供的关键工具之一,通过操作InterProcessSemaphoreV2 API可以实现在分布式场景中如同像在JVM中操作Semaphore一样的功能,具有公平,可重入等多个特性。
tips1: 阅读本节,需要简单了解ZK及Curator的简单知识,任意门:Curator简单介绍
tips2: 阅读本节,建议先阅读分布式锁与分布式信号量实现原理
tips3:分布式信号量中,每个许可被称为lease
下面我们开始介阅读源码,看下InterProcessSemaphoreV2如何在分布式系统下实现了信号量。
创意信号量对象
public class ZkSempahore implements SharedCountListener{
private static String path = "/semahore/test";
private InterProcessSemaphoreV2 interProcessSemaphoreV2;
//初始化方法,创建客户端
public void init() throws Exception{
//此处省略了zkclient的创建必要参数等
CuratorFramework client = CuratorFrameworkFactory.newClient();
SharedCount count = new SharedCount(client, path, 5);
count.addListener(this);
count.start();
interProcessSemaphoreV2 = new InterProcessSemaphoreV2(client, path,count);
}
//获取lease操作
public Lease acquire() throws Exception{
return interProcessSemaphoreV2.acquire();
}
//释放lease操作
public void release(Lease lease) {
interProcessSemaphoreV2.returnLease(lease);
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
//count值发生变化被回调
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
//client状态发生变化被回调
}
}
acquire方法解析
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasWait = (unit != null);
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
Preconditions.checkArgument(qty > 0, "qty cannot be 0");
ImmutableList.Builder<Lease> builder = ImmutableList.builder();
boolean success = false;
try
{
while ( qty-- > 0 )
{
int retryCount = 0;
long startMillis = System.currentTimeMillis();
boolean isDone = false;
while ( !isDone )
{
// 获取lease关键代码
switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
{
....
}
}
}
success = true;
}
finally
{
//如果获取失败,释放所有的节点
if ( !success )
{
returnAll(builder.build());
}
}
return builder.build();
}
acquire第一次实际意义代码,这段代码internalAcquire1Lease去尝试获取lease,qty=1表示线程只能获取一个lease。
进入internalAcquire1Lease看实现
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
{
//client状态如果不是启动,返回NULL
if ( client.getState() != CuratorFrameworkState.STARTED )
{
return InternalAcquireResult.RETURN_NULL;
}
if ( hasWait )
{
//等待超时时间判断
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
{
return InternalAcquireResult.RETURN_NULL;
}
}
else
{
//获取锁的关键方法,如果获取失败则被阻塞
lock.acquire();
}
Lease lease = null;
try
{
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
//在指定的lease path下创建node
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
lease = makeLease(path);
if ( debugAcquireLatch != null )
{
debugAcquireLatch.await();
}
//同步块
synchronized(this)
{
//死循环,通过return返回或者异常
for(;;)
{
List<String> children;
try
{ //获取该lease path下子节点集合
children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
}
catch ( Exception e )
{
if ( debugFailedGetChildrenLatch != null )
{
debugFailedGetChildrenLatch.countDown();
}
returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
throw e;
}
//若子节点集合不包含当前节点创建的子节点,异常处理
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
returnLease(lease);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
//若lease path下创建的子节点个数,不超过令牌的最大个数,break出循环
if ( children.size() <= maxLeases )
{
break;
}
//若存在等待逻辑,执行等待逻辑
if ( hasWait )
{
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( thisWaitMs <= 0 )
{
returnLease(lease);
return InternalAcquireResult.RETURN_NULL;
}
wait(thisWaitMs);
}
else
{
wait();
}
}
}
}
finally
{
//释放当前lock path下创建的lock node
//此处操作非常重要
lock.release();
}
//返回lease
builder.add(Preconditions.checkNotNull(lease));
return InternalAcquireResult.CONTINUE;
}
此处有几处需要注意
- 获取许可的操作被分为两部分
1.1 首先通过lock.acquire()方法去得到创建lock path 下临时节点的权利,否则会阻塞;
1.2 成功获取lock后需要通过创建lease path下临时节点,并判断lease path下临时节点个数小于令牌个数实现信号量。
可以认为第二步是实现分布式信号量的关键 - 通过finally中的lock.release() 释放在1.1中被创建的lock path下临时节点。这一部的必要是当抛出异常或者成功获取lease后,需要去释放lock path下的临时节点让其他客户端有解除阻塞,有继续去获取lease的机会
也许有的朋友会问,为什么要将分布式信号量设计为两步,同时维护lock path和lease path。只维护一个lease path难道不可以吗?后面我会解答这个问题。
下面首先进入第一步,去获取lock锁,进入lock.acquire()方法内部,InterProcessSemaphoreV2内部实现了InterProcessMutex,InterProcessMutexq其实是分布式锁与分布式信号量实现原理中分布式锁的实现方案,简单而言,其实现了一个公平的可重入分布式锁。
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread();
//可重入代码实现,threadData是一个单例的ConcurrentMap
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//如果不存在,则尝试获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{ //如果获取成功,则更新threadData,并返回
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
上面这段代码实现的主要功能是可重入,进入attemptLock,终于可以接近关键的信息了。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
//首先基于lock path创建一个节点,注意所有的客户端进来时不管是否获取锁,都首先创建节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//判断当前客户端是否获取锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
上面这段代码,首先为本次申请创建了一个临时node。
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获取lock path下的所有node name,并根据node name编号降序排列
List<String> children = getSortedChildren();
//获取当前acquire创建的node节点编号
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//判断获取锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//成功获取
haveTheLock = true;
}
else
{ //得到前一个编号的node 路径
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 监听前一个node
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{ //如果获取失败,删除自己的node 临时节点
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
以上这部分是较为核心代码,其设计思路如同分布式锁与分布式信号量实现原理介绍的,首先判断自己是否获取锁,没有的话就监听并wait前一个node。进入getsTheLock方法内部
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
//自己的node在所有有序子节点集合中的编号位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLease恒等于1,判断自己是否处于第0位,也就是子节点中最小的节点,如果是最小节点则获取锁,否则不获取
boolean getsTheLock = ourIndex < maxLeases;
//如果获取锁成功,则为null,否则返回排在自己前面node的path
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
//包装好返回
return new PredicateResults(pathToWatch, getsTheLock);
}
自此分布式信号量InterProcessSemaphoreV2 acquire方法解读完成。
InterProcessSemaphoreV2的release方法较为简单。
//在创建lease时候注册了回调方法
private Lease makeLease(final String path)
{
return new Lease()
{
@Override
public void close() throws IOException
{
try
{
client.delete().guaranteed().forPath(path);
}
catch ( KeeperException.NoNodeException e )
{
log.warn("Lease already released", e);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
}
@Override
public byte[] getData() throws Exception
{
return client.getData().forPath(path);
}
@Override
public String getNodeName() {
return ZKPaths.getNodeFromPath(path);
}
};
}
以上InterProcessSemaphoreV2核心源码解读基本完成。其设计思路基于了分布式锁与分布式信号量实现原理中的方法。
总体上InterProcessSemaphoreV2基于InterProcessMutex实现了一个非公平信号量。回到提到的问题,为什么需要设置成两个zk path?
个人决定原因主要在于zk node 节点状态变动event一般都是发送给单节点,主要为了避免羊群效应。通过lock path可以实现大家在获取许可时,只需要监听自己前面一个节点。
网友评论