分布式锁的基本场景
通过分布式锁来解决多个服务节点同时访问某一个共享资源如库存归档等
用 zookeeper 来实现分布式锁
使用节点特性:某一个路径下节点名称唯一+watch机制
image.png
成功创建表示获取锁,失败则监听该节点的变化,若被删掉则其他客户端则重新创建文件。
缺点:容易产生惊群效应,即一旦释放锁,则所有的客户端一起创建文件。
利用有序节点来实现分布式锁
使用节点特性:临时有序节点+watch机制
节点序号最小的获取锁,本节点指监听上一个节点。
curator 分布式锁的基本使用
InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
final InterProcessMutex lock=new InterProcessMutex(curatorFramework,"/locks");
for(int i=0;i<10;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"->尝试竞争锁");
try {
lock.acquire(); //阻塞竞争锁
System.out.println(Thread.currentThread().getName()+"->成功获得了锁");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
lock.release(); //释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
},"Thread-"+i).start();
}
}
Curator 实现分布式锁的基本原理
acquire
1 创建临时有序节点
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
2 序号最小的节点获取锁,其他节点则监听上一个节点和等待
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
return haveTheLock;
}
release
1 删除当前结点
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
2 执行watch即唤醒其他节点
private synchronized void notifyFromWatcher()
{
notifyAll();
}
网友评论