美文网首页
二面被问“Zookeeper-分布式锁”,教你一招怒怼面试官

二面被问“Zookeeper-分布式锁”,教你一招怒怼面试官

作者: Java程序员石头 | 来源:发表于2021-12-09 13:28 被阅读0次

废话不多说,直接上图。

从整个流程中可以看出,zk实现分布式锁,主要是靠zk的临时顺序节点和watch机制实现的。

2. quick start

Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。

curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

2.1 引入依赖

org.springframework.bootspring-boot-starter-weborg.apache.curatorcurator-recipes5.2.0

curator-recipes中已经依赖了zookeeper和curator-framework jar,所以这里不用额外的依赖其他jar。

2.2 测试代码

测试代码其实很简单,只需要几行代码而已,初始化客户端,创建锁对象,加锁 和 释放锁。

这里先把加锁的代码注释掉,试下不加锁的情况。

packagecom.ldx.zookeeper.controller;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.curator.RetryPolicy;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.ExponentialBackoffRetry;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.PostConstruct;/** * 分布式锁demo * *@authorludangxin *@date2021/9/4 */@Slf4j@RestController@RequestMapping("lock")@RequiredArgsConstructorpublicclassLockDemoController{/**

    * 库存数

    */privateInteger stock =30;/**

    * zk client

    */privatestatic CuratorFramework CLIENT;/**

    * 初始化连接信息

    */@PostConstructprivatevoidinit() {      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);      CLIENT = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(retryPolicy).build();      CLIENT.start();  }@GetMapping("buy")publicString buy() {// 可重入锁InterProcessMutex mutexLock = new InterProcessMutex(CLIENT,"/lock");try{// 加锁//        mutexLock.acquire();if(this.stock >0) {            Thread.sleep(500);this.stock--;        }        log.info("剩余库存==={}",this.stock);      }catch(Exception e) {        log.error(e.getMessage());return"no";      }finally{try{// 释放锁//            mutexLock.release();}catch(Exception e) {            log.error(e.getMessage());        }      }return"ok";  }}

2.3 启动测试

这里我们使用jemter进行模拟并发请求,当然我们这里只启动了一个server,主要是为了节约文章篇幅(启动多个server还得连接db...),能说明问题即可。

同一时刻发送一百个请求。

测试结果部分日志如下:

很明显出现了超卖了现象,并且请求是无序的(请求是非公平的)。

此时我们将注释的加锁代码打开,再进行测试。

测试结果部分日志如下:

很明显没有出现超卖的现象。

通过zk 客户端工具查看创建的部分临时节点如下:

3. 源码解析

3.1 加锁逻辑

我们再通过查看Curator加锁源码来验证下我们的加锁逻辑。

首先我们查看InterProcessMutex::acquire()方法,并且我们通过注释可以得知该方法加的锁是可重入锁。

/** * Acquire the mutex - blocking until it's available. Note: the same thread * can call acquire re-entrantly. Each call to acquire must be balanced by a call * to {@link#release()} * *@throwsException ZK errors, connection interruptions */@Overridepublicvoidacquire()throwsException{if( !internalLock(-1,null) )    {thrownewIOException("Lost connection while trying to acquire lock: "+ basePath);    }}

查看internalLock方法如下。

privatefinalConcurrentMap threadData = Maps.newConcurrentMap();privatebooleaninternalLock(longtime, TimeUnit unit)throwsException{// 获取当前线程Thread currentThread = Thread.currentThread();// 在map中查看当前线程有没有请求过LockData lockData = threadData.get(currentThread);if( lockData !=null) {// 请求过 则 +1 , 实现了锁的重入逻辑lockData.lockCount.incrementAndGet();returntrue;    }// 尝试获取锁String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if( lockPath !=null) {// 创建锁对象LockData newLockData =newLockData(currentThread, lockPath);// 添加到map中threadData.put(currentThread, newLockData);returntrue;    }returnfalse;}

我们继续查看LockInternals::attemptLock()尝试获取锁逻辑如下。

StringattemptLock(longtime, TimeUnit unit,byte[] lockNodeBytes)throwsException{finallongstartMillis = System.currentTimeMillis();finalLong      millisToWait = (unit !=null) ? unit.toMillis(time) :null;finalbyte[]    localLockNodeBytes = (revocable.get() !=null) ?newbyte[0] : lockNodeBytes;intretryCount =0;    String          ourPath =null;booleanhasTheLock =false;booleanisDone =false;while(!isDone) {// 成功标识isDone =true;try{// 创建锁ourPath = driver.createsTheLock(client, path, localLockNodeBytes);// 判断是否加锁成功hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);        }catch( KeeperException.NoNodeException e ) {// 当StandardLockInternalsDriver 找不到锁定节点时,它会抛出会话过期等情况。因此,如果重试允许,则继续循环if( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {                isDone =false;            }else{throwe;            }        }    }if(hasTheLock) {returnourPath;    }returnnull;}

在这里先查看下创建锁的逻辑StandardLockInternalsDriver::createsTheLock(),如下。

@OverridepublicStringcreatesTheLock(CuratorFramework client, String path,byte[] lockNodeBytes)throwsException{    String ourPath;// 判断有没有传znode data 我们这里为nullif(lockNodeBytes !=null) {        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);    }else{// 创建Container父节点且创建临时的顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);    }returnourPath;}

锁创建成功后我们再查看下程序是如何加锁的LockInternals::internalLockLoop()。

privatebooleaninternalLockLoop(longstartMillis, Long millisToWait, String ourPath)throwsException{booleanhaveTheLock =false;booleandoDelete =false;try{if(revocable.get() !=null) {            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);        }// 当客户端初始化好后 且 还没有获取到锁while((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {// 获取所有的子节点 且 递增排序List        children = getSortedChildren();// 获取当前节点 pathString              sequenceNodeName = ourPath.substring(basePath.length() +1);// 获取当前锁// 1. 先判断当前节点是不是下标为0的节点,即是不是序列值最小的节点。// 2. 如果是则获取锁成功,返回成功标识。// 3. 如果不是则返回比它小的元素作为被监听的节点PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if(predicateResults.getsTheLock()) {// 获取锁成功 返回成功标识haveTheLock =true;            }else{// 索取锁失败,则获取比它小的上一个节点元素String  previousSequencePath = basePath +"/"+ predicateResults.getPathToWatch();synchronized(this) {try{// 监听比它小的上一个节点元素client.getData().usingWatcher(watcher).forPath(previousSequencePath);// 如果设置了超时,则继续判断是否超时if(millisToWait !=null) {                            millisToWait -= (System.currentTimeMillis() - startMillis);                            startMillis = System.currentTimeMillis();if(millisToWait <=0) {                                doDelete =true;break;                            }// 没有超时则 等待wait(millisToWait);                        }else{// 没有超时则 等待wait();                        }                    }catch(KeeperException.NoNodeException e) {// it has been deleted (i.e. lock released). Try to acquire again}                }            }        }    }catch(Exception e) {        ThreadUtils.checkInterrupted(e);        doDelete =true;throwe;    }finally{// 报错即删除该节点if(doDelete) {            deleteOurPath(ourPath);        }    }returnhaveTheLock;}

最后 我们再看下上段代码中提到的很关键的方法driver.getsTheLock() 即 StandardLockInternalsDriver::getsTheLock()。

@OverridepublicPredicateResultsgetsTheLock(CuratorFramework client, List children, String sequenceNodeName,intmaxLeases)throwsException{// 获取当前节点的下标 intourIndex = children.indexOf(sequenceNodeName);    validateOurIndex(sequenceNodeName, ourIndex);// 这里的maxLeases == 1,即当前节点的下标是不是0booleangetsTheLock = ourIndex < maxLeases;// 如果当前节点的下标为0,则不返回被监听的节点(因为自己已经是最小的节点了),如果不是则返回比自己小的节点作为被监听的节点。String          pathToWatch = getsTheLock ?null: children.get(ourIndex - maxLeases);// 构造返回结果returnnewPredicateResults(pathToWatch, getsTheLock);}

3.2 小节

其实加锁的源码还是比较清晰和易懂的,我们在这里再总结下。

执行InterProcessMutex::acquire()加锁方法。

InterProcessMutex::internalLock()判断当前线程是加过锁,如果加过则加锁次数+1实现锁的重入,如果没有加过锁,则调用LockInternals::attemptLock()尝试获取锁。

LockInternals::attemptLock()首先创建Container父节再创建临时的顺序节点,然后执行加锁方法LockInternals::internalLockLoop()。

LockInternals::internalLockLoop()先获取当前Container下的所有顺序子节点并且按照从小到大排序。调用StandardLockInternalsDriver::getsTheLock()方法加锁,先判断当前节点是不是最小的顺序节点,如果是则加锁成功,如果不是则返回上一个比他小的节点,最为被监听的节点。上一步加锁成功则返回true,如果失败则执行监听逻辑。

3.3 释放锁逻辑

@Overridepublicvoidrelease()throwsException{/*

        Note on concurrency: a given lockData instance

        can be only acted on by a single thread so locking isn't necessary

    */// 获取当前线程Thread currentThread = Thread.currentThread();// 查看当前线程有没有锁LockData lockData = threadData.get(currentThread);if(lockData ==null) {// 没有锁 还释放,报错thrownewIllegalMonitorStateException("You do not own the lock: "+ basePath);    }// 有锁则 锁次数 -1intnewLockCount = lockData.lockCount.decrementAndGet();// 如果锁的次数还大于0,说明还不能释放锁,因为重入的方法还未执行完if(newLockCount >0) {return;    }if(newLockCount <0) {// 锁的次数小于0,报错thrownewIllegalMonitorStateException("Lock count has gone negative for lock: "+ basePath);    }try{// 删除节点internals.releaseLock(lockData.lockPath);    }finally{// 从当前的map中移除threadData.remove(currentThread);    }}finalvoidreleaseLock(String lockPath)throwsException{    client.removeWatchers();    revocable.set(null);    deleteOurPath(lockPath);}

4. redis 和 zookeeper

Zookeeper采用临时节点和事件监听机制可以实现分布式锁,Redis主要是通过setnx命令实现分布式锁。

Redis需要不断的去尝试获取锁,比较消耗性能,Zookeeper是可以通过对锁的监听,自动获取到锁,所以性能开销较小。

另外如果获取锁的jvm出现bug或者挂了,那么只能redis过期删除key或者超时删除key,Zookeeper则不存在这种情况,连接断开节点则会自动删除,这样会即时释放锁。

这样一听感觉zk的优势还是很大的。

但是要考虑一个情况在锁并发不高的情况下 zk没有问题 如果在并发很高的情况下 zk的数据同步 可能造成锁时延较长,在选举过程中需要接受一段时间zk不可用(因为ZK 是 CP 而 redis集群是AP)。

所以说没有哪个技术是适用于任何场景的,具体用哪个技术,还是要结合当前的技术架构和业务场景做选型和取舍。

相关文章

网友评论

      本文标题:二面被问“Zookeeper-分布式锁”,教你一招怒怼面试官

      本文链接:https://www.haomeiwen.com/subject/epjffrtx.html