美文网首页
InterProcessSemaphoreV2核心源码解读(分布

InterProcessSemaphoreV2核心源码解读(分布

作者: mawu_1014 | 来源:发表于2019-07-27 23:43 被阅读0次

    InterProcessSemaphoreV2是Curator提供的关键工具之一,通过操作InterProcessSemaphoreV2 API可以实现在分布式场景中如同像在JVM中操作Semaphore一样的功能,具有公平,可重入等多个特性。


    tips1: 阅读本节,需要简单了解ZK及Curator的简单知识,任意门:Curator简单介绍
    tips2: 阅读本节,建议先阅读分布式锁与分布式信号量实现原理
    tips3:分布式信号量中,每个许可被称为lease


    下面我们开始介阅读源码,看下InterProcessSemaphoreV2如何在分布式系统下实现了信号量。

    创意信号量对象

    public class ZkSempahore implements SharedCountListener{
        private static String path = "/semahore/test";
        private InterProcessSemaphoreV2 interProcessSemaphoreV2;
        
    
        //初始化方法,创建客户端
        public void init() throws Exception{
            //此处省略了zkclient的创建必要参数等
            CuratorFramework client = CuratorFrameworkFactory.newClient();
            SharedCount count = new SharedCount(client, path, 5);
            count.addListener(this);
            count.start();
            interProcessSemaphoreV2 = new InterProcessSemaphoreV2(client, path,count);
        }
        //获取lease操作
        public Lease acquire() throws Exception{
            return interProcessSemaphoreV2.acquire();
        }
        
        //释放lease操作
        public void release(Lease lease) {
            interProcessSemaphoreV2.returnLease(lease);
        }
    
        @Override
        public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
            //count值发生变化被回调
        }
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            //client状态发生变化被回调
        }
    }
    

    acquire方法解析

    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
        {
            long startMs = System.currentTimeMillis();
            boolean hasWait = (unit != null);
            long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
    
            Preconditions.checkArgument(qty > 0, "qty cannot be 0");
    
            ImmutableList.Builder<Lease> builder = ImmutableList.builder();
            boolean success = false;
            try
            {
                while ( qty-- > 0 )
                {
                    int retryCount = 0;
                    long startMillis = System.currentTimeMillis();
                    boolean isDone = false;
                    while ( !isDone )
                    {
                        // 获取lease关键代码
                        switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
                        {
                            ....
                        }
                    }
                }
                success = true;
            }
            finally
            {
                //如果获取失败,释放所有的节点
                if ( !success )
                {
                    returnAll(builder.build());
                }
            }
            return builder.build();
        }
    

    acquire第一次实际意义代码,这段代码internalAcquire1Lease去尝试获取lease,qty=1表示线程只能获取一个lease。
    进入internalAcquire1Lease看实现

    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
        {
    
            //client状态如果不是启动,返回NULL
            if ( client.getState() != CuratorFrameworkState.STARTED )
            {
                return InternalAcquireResult.RETURN_NULL;
            }
    
            if ( hasWait )
            {
                //等待超时时间判断
                long thisWaitMs = getThisWaitMs(startMs, waitMs);
                if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
                {
                    return InternalAcquireResult.RETURN_NULL;
                }
            }
            else
            {
                //获取锁的关键方法,如果获取失败则被阻塞
                lock.acquire();
            }
            Lease lease = null;
    
            try
            {
                PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
                //在指定的lease path下创建node
                String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
                String nodeName = ZKPaths.getNodeFromPath(path);
                lease = makeLease(path);
    
                if ( debugAcquireLatch != null )
                {
                    debugAcquireLatch.await();
                }
                //同步块
                synchronized(this)
                {
                    //死循环,通过return返回或者异常
                    for(;;)
                    {
                        List<String> children;
                        try
                        {   //获取该lease path下子节点集合
                            children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
                        }
                        catch ( Exception e )
                        {
                            if ( debugFailedGetChildrenLatch != null )
                            {
                                debugFailedGetChildrenLatch.countDown();
                            }
                            returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
                            throw e;
                        }
                        //若子节点集合不包含当前节点创建的子节点,异常处理
                        if ( !children.contains(nodeName) )
                        {
                            log.error("Sequential path not found: " + path);
                            returnLease(lease);
                            return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                        }
                        //若lease path下创建的子节点个数,不超过令牌的最大个数,break出循环
                        if ( children.size() <= maxLeases )
                        {
                            break;
                        }
                        //若存在等待逻辑,执行等待逻辑
                        if ( hasWait )
                        {
                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
                            if ( thisWaitMs <= 0 )
                            {
                                returnLease(lease);
                                return InternalAcquireResult.RETURN_NULL;
                            }
                            wait(thisWaitMs);
                        }
                        else
                        {
                            wait();
                        }
                    }
                }
            }
            finally
            {
                //释放当前lock path下创建的lock node
                //此处操作非常重要
                lock.release();
            }
            //返回lease
            builder.add(Preconditions.checkNotNull(lease));
            return InternalAcquireResult.CONTINUE;
        }
    

    此处有几处需要注意

    1. 获取许可的操作被分为两部分
      1.1 首先通过lock.acquire()方法去得到创建lock path 下临时节点的权利,否则会阻塞;
      1.2 成功获取lock后需要通过创建lease path下临时节点,并判断lease path下临时节点个数小于令牌个数实现信号量。
      可以认为第二步是实现分布式信号量的关键
    2. 通过finally中的lock.release() 释放在1.1中被创建的lock path下临时节点。这一部的必要是当抛出异常或者成功获取lease后,需要去释放lock path下的临时节点让其他客户端有解除阻塞,有继续去获取lease的机会

    也许有的朋友会问,为什么要将分布式信号量设计为两步,同时维护lock path和lease path。只维护一个lease path难道不可以吗?后面我会解答这个问题。

    下面首先进入第一步,去获取lock锁,进入lock.acquire()方法内部,InterProcessSemaphoreV2内部实现了InterProcessMutex,InterProcessMutexq其实是分布式锁与分布式信号量实现原理中分布式锁的实现方案,简单而言,其实现了一个公平的可重入分布式锁。

      private boolean internalLock(long time, TimeUnit unit) throws Exception
        {
    
            Thread currentThread = Thread.currentThread();
    
            //可重入代码实现,threadData是一个单例的ConcurrentMap
            LockData lockData = threadData.get(currentThread);
            if ( lockData != null )
            {
                // re-entering
                lockData.lockCount.incrementAndGet();
                return true;
            }
            //如果不存在,则尝试获取锁
            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
            if ( lockPath != null )
            {   //如果获取成功,则更新threadData,并返回
                LockData newLockData = new LockData(currentThread, lockPath);
                threadData.put(currentThread, newLockData);
                return true;
            }
            return false;
        }
    }
    

    上面这段代码实现的主要功能是可重入,进入attemptLock,终于可以接近关键的信息了。

     String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
        {
            final long      startMillis = System.currentTimeMillis();
            final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
            final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
            int             retryCount = 0;
    
            String          ourPath = null;
            boolean         hasTheLock = false;
            boolean         isDone = false;
            while ( !isDone )
            {
                isDone = true;
                try
                {   
                    //首先基于lock path创建一个节点,注意所有的客户端进来时不管是否获取锁,都首先创建节点
                    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                    //判断当前客户端是否获取锁
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    {
                        isDone = false;
                    }
                    else
                    {
                        throw e;
                    }
                }
            }
    
            if ( hasTheLock )
            {
                return ourPath;
            }
            return null;
        }
    

    上面这段代码,首先为本次申请创建了一个临时node。

     private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        {
            boolean     haveTheLock = false;
            boolean     doDelete = false;
            try
            {
                if ( revocable.get() != null )
                {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
    
                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    //获取lock path下的所有node name,并根据node name编号降序排列
                    List<String>        children = getSortedChildren();
                    //获取当前acquire创建的node节点编号
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                    //判断获取锁
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        //成功获取
                        haveTheLock = true;
                    }
                    else    
                    {   //得到前一个编号的node 路径
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                        synchronized(this)
                        {
                            try 
                            {
                                // 监听前一个node 
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if ( millisToWait != null )
                                {
                                    millisToWait -= (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                    if ( millisToWait <= 0 )
                                    {
                                        doDelete = true;    // timed out - delete our node
                                        break;
                                    }
    
                                    wait(millisToWait);
                                }
                                else
                                {
                                    wait();
                                }
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                doDelete = true;
                throw e;
            }
            finally
            {
                if ( doDelete )
                {   //如果获取失败,删除自己的node 临时节点
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    

    以上这部分是较为核心代码,其设计思路如同分布式锁与分布式信号量实现原理介绍的,首先判断自己是否获取锁,没有的话就监听并wait前一个node。进入getsTheLock方法内部

    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
       {   
           //自己的node在所有有序子节点集合中的编号位置
           int             ourIndex = children.indexOf(sequenceNodeName);
           validateOurIndex(sequenceNodeName, ourIndex);
           //maxLease恒等于1,判断自己是否处于第0位,也就是子节点中最小的节点,如果是最小节点则获取锁,否则不获取
           boolean         getsTheLock = ourIndex < maxLeases;
           //如果获取锁成功,则为null,否则返回排在自己前面node的path
           String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
           //包装好返回
           return new PredicateResults(pathToWatch, getsTheLock);
       }
    

    自此分布式信号量InterProcessSemaphoreV2 acquire方法解读完成。
    InterProcessSemaphoreV2的release方法较为简单。

     //在创建lease时候注册了回调方法
     private Lease makeLease(final String path)
        {
            return new Lease()
            {
                @Override
                public void close() throws IOException
                {
                    try 
                    {
                        client.delete().guaranteed().forPath(path);
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        log.warn("Lease already released", e);
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        throw new IOException(e);
                    }
                }
    
                @Override
                public byte[] getData() throws Exception
                {
                    return client.getData().forPath(path);
                }
    
                @Override
                public String getNodeName() {
                    return ZKPaths.getNodeFromPath(path);
                }
            };
        }
    

    以上InterProcessSemaphoreV2核心源码解读基本完成。其设计思路基于了分布式锁与分布式信号量实现原理中的方法。
    总体上InterProcessSemaphoreV2基于InterProcessMutex实现了一个非公平信号量。回到提到的问题,为什么需要设置成两个zk path?
    个人决定原因主要在于zk node 节点状态变动event一般都是发送给单节点,主要为了避免羊群效应。通过lock path可以实现大家在获取许可时,只需要监听自己前面一个节点。

    相关文章

      网友评论

          本文标题:InterProcessSemaphoreV2核心源码解读(分布

          本文链接:https://www.haomeiwen.com/subject/kvbyrctx.html