美文网首页
Zookeeper客户端Curator实现分布式锁及源码分析

Zookeeper客户端Curator实现分布式锁及源码分析

作者: 超_onlyu | 来源:发表于2020-11-26 09:22 被阅读0次

    API说明

    InterProcessMutex有两个构造方法

    public InterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }
    
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, LOCK_NAME, 1, driver);
    }
    

    参数说明

    参数 说明
    client curator中zk客户端对象
    path 抢锁路径,同一个锁path需一致
    driver 可自定义lock驱动实现分布式锁

    主要方法

    //获取锁,若失败则阻塞等待直到成功,支持重入
    public void acquire() throws Exception
    //超时获取锁,超时失败
    public boolean acquire(long time, TimeUnit unit) throws Exception
    //释放锁
    public void release() throws Exception
    

    注意:调用acquire()方法后需相应调用release()来释放锁

    使用简介

    下面的例子模拟了100个线程同时抢锁,抢锁成功的线程睡眠1秒钟后释放锁,通知其他等待的线程重新抢锁,比较简单,不多说

    public class InterprocessLock {
        static CountDownLatch countDownLatch = new CountDownLatch(10);
    
        public static void main(String[] args)  {
            CuratorFramework zkClient = getZkClient();
            String lockPath = "/lock";
            InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
            //模拟100个线程抢锁
            for (int i = 0; i < 100; i++) {
                new Thread(new TestThread(i, lock)).start();
            }
        }
    
        static class TestThread implements Runnable {
            private Integer threadFlag;
            private InterProcessMutex lock;
    
            public TestThread(Integer threadFlag, InterProcessMutex lock) {
                this.threadFlag = threadFlag;
                this.lock = lock;
            }
    
            @Override
            public void run() {
                try {
                    lock.acquire();
                    System.out.println("第"+threadFlag+"线程获取到了锁");
                    //等到1秒后释放锁
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private static CuratorFramework getZkClient() {
            String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
            CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                    .connectString(zkServerAddress)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
            zkClient.start();
            return zkClient;
        }
    }
    

    源码分析

    从获取锁acquire()方法入手

    public void acquire() throws Exception {
        if ( !internalLock(-1, null) ) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    

    看到调用了internalLock方法,进到internalLock方法中

    private boolean internalLock(long time, TimeUnit unit) throws Exception
        {
            /*
               Note on concurrency: a given lockData instance
               can be only acted on by a single thread so locking isn't necessary
            */
    
            Thread currentThread = Thread.currentThread();
            //先判断当前线程是否持有了锁,如果是,则加锁次数count+1,返回成功
            LockData lockData = threadData.get(currentThread);
            if ( lockData != null )
            {
                // re-entering
                lockData.lockCount.incrementAndGet();
                return true;
            }
            //调用LockInternals的attemptLock()方法进行加锁
            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
            //加锁成功,则将当前线程对应加锁数据加到map中
            if ( lockPath != null )
            {
                LockData newLockData = new LockData(currentThread, lockPath);
                threadData.put(currentThread, newLockData);
                return true;
            }
    
            return false;
        }
    

    进到LockInternals的attemptLock()中,看下代码

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
        {
            //开始时间,后面用做超时判断
            final long      startMillis = System.currentTimeMillis();
            //超时时间,转换为毫秒
            final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
            //节点数据
            final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
            //重试次数
            int             retryCount = 0;
            //lockPath
            String          ourPath = null;
            //是否持有锁
            boolean         hasTheLock = false;
            //是否处理完成
            boolean         isDone = false;
            //循环处理
            while ( !isDone )
            {
                isDone = true;
    
                try
                {
                    //在path下创建一个EPHEMERAL_SEQUENTIAL(临时顺序型)类型节点
                    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                    //抢锁并判断是否拥有锁
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    // 重试范围内时进行重试
                    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    {
                        isDone = false;
                    }
                    else
                    {
                        throw e;
                    }
                }
            }
    
            if ( hasTheLock )
            {
                return ourPath;
            }
    
            return null;
        }
    

    创建临时有序节点createsTheLock方法如下,比较简单

    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
        {
            String ourPath;
            if ( lockNodeBytes != null )
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
            }
            else
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
            }
            return ourPath;
        }
    

    判断是否拥有锁的方法internalLockLoop才是核心,下面注意了

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        {
            boolean     haveTheLock = false;
            boolean     doDelete = false;
            try
            {
                if ( revocable.get() != null )
                {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
                //自旋
                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    //获取path下对应临时有序节点,并按节点编号从小到大排序
                    List<String>        children = getSortedChildren();
                    //获取当前线程创建的临时节点名称
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                    //判断当前节点编号是否<maxLease,若是,则抢到了锁,maxLease这里为1,所以只有index为0时才抢到锁,标识只有1个线程能抢到锁
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        haveTheLock = true;
                    }
                    else
                    {
                        //前一个节点编号较小的节点的路径
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                        synchronized(this)
                        {
                            try 
                            {
                                // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                                 //如果没抢到锁,监听前一个节点事件
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if ( millisToWait != null )
                                {
                                    判断是否超时
                                    millisToWait -= (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                    if ( millisToWait <= 0 )
                                    {
                                        //超时 直接退出,并标记 删除节点doDelete标记=true
                                        doDelete = true;    // timed out - delete our node
                                        break;
                                    }
    
                                    wait(millisToWait);
                                }
                                else
                                {
                                    //调用Object.wait(),等待线程被notify唤醒
                                    wait();
                                }
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                doDelete = true;
                throw e;
            }
            finally
            {
                //如果标记了删除,删除节点数据
                if ( doDelete )
                {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    

    可以看到逻辑比较清晰,N个线程同时在path下创建临时顺序节点,编号最小的获取锁,没抢到锁的会调用wait()方法等待被唤醒
    那么是在哪里调用了notify()方法来唤醒其他节点的呢?
    答案是在监听器wacher里,该监听器会在前一个(节点编号较小)的节点被删除后触发
    先分析下释放锁的方法release
    看下源码

    public void release() throws Exception
        {
            /*
                Note on concurrency: a given lockData instance
                can be only acted on by a single thread so locking isn't necessary
             */
    
            Thread currentThread = Thread.currentThread();
            LockData lockData = threadData.get(currentThread);
            if ( lockData == null )
            {
                throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
            }
            //如果锁被当前线程获取了超过1次,将count-1,直接返回
            int newLockCount = lockData.lockCount.decrementAndGet();
            if ( newLockCount > 0 )
            {
                return;
            }
            if ( newLockCount < 0 )
            {
                throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
            }
            try
            {
                //释放锁
                internals.releaseLock(lockData.lockPath);
            }
            finally
            {
                threadData.remove(currentThread);
            }
        }
    

    最终调用releaseLock方法中的deleteOurPath中

    void releaseLock(String lockPath) throws Exception
        {
            revocable.set(null);
            deleteOurPath(lockPath);
        }
        
        private void deleteOurPath(String ourPath) throws Exception
        {
            try
            {
            //直接调用client删除节点
                client.delete().guaranteed().forPath(ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // ignore - already deleted (possibly expired session, etc.)
            }
        }
    

    节点被删除后,会触发抢锁过程中的wather监听器,看下监听器中内容

    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            notifyFromWatcher();
        }
    };
    private synchronized void notifyFromWatcher() {
            notifyAll();
    }
    

    可以看到节点path被删除后,会通知后面一个节点进行notify操作,notify操作后,重新进入while自旋中,重新判断是否抢到了锁

    最后看下getTheLock

    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
                throws Exception {
            // 之前创建的临时顺序节点在排序后的子节点列表中的索引
            int ourIndex =
                    children.indexOf(sequenceNodeName);
            // 校验之前创建的临时顺序节点是否有效
            validateOurIndex(sequenceNodeName,
                    ourIndex);
            // 锁公平性的核心逻辑
            // 由 InterProcessMutex 的构造函数可知, maxLeases 为 1,即只有 ourIndex 为 0 时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁
            // Zookeeper 的临时顺序节点特性能保证跨多个 JVM 的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁
            boolean getsTheLock = ourIndex <
                    maxLeases;
            // 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex - 1)
            // 因 为 锁 是 公 平 的 , 因 此 无 需 监 听 除 了(ourIndex - 1)以外的所有节点,这是为了减少羊群效应, 非常巧妙的设计!!
            String pathToWatch = getsTheLock ? null :
                    children.get(ourIndex - maxLeases);
            // 返回获取锁的结果,交由上层继续处理(添加监听等操作)
            return new PredicateResults(pathToWatch,
                    getsTheLock);
        }
    
        static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
            if (ourIndex < 0) {
                // 容错处理,可跳过
                // 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被 Zookeeper 服务端删除,往外抛出 NoNodeException
                // 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点
                // 佩服 Curator 的作者将边界条件考虑得 如此周到!
                throw new KeeperException.NoNodeException("Sequential path  not found:" + sequenceNodeName);
            }
        }
    

    相关文章

      网友评论

          本文标题:Zookeeper客户端Curator实现分布式锁及源码分析

          本文链接:https://www.haomeiwen.com/subject/kjhaiktx.html