美文网首页
Zookeeper 实现分布式原理及源码分析

Zookeeper 实现分布式原理及源码分析

作者: taj3991 | 来源:发表于2020-02-19 17:27 被阅读0次

    Zookeeper分布式锁的原理

    Zookeeper分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:

    获取锁

    首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。

    之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

    这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。


    Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。

    于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。

    这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。

    Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。

    于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。

    这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的AQS(AbstractQueuedSynchronizer)。

    获得锁的过程大致就是这样,那么Zookeeper如何释放锁呢?

    释放锁的过程很简单,只需要释放对应的子节点就好。

    释放锁

    释放锁分为两种情况:

    1.任务完成,客户端显示释放

    当任务完成时,Client1会显示调用删除节点Lock1的指令。

    2.任务执行过程中,客户端崩溃

    获得锁的Client1在任务执行过程中,如果Duang的一声崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。

    由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。

    同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知。

    最终,Client3成功得到了锁。

    使用Zookeeper实现分布式锁的大致流程就是这样。那么使用Zookeeper实现的分布式锁和Redis实现的分布式锁相比有什么优势和劣势呢?

    下面总结一下他们各自的优劣。

    有人说Zookeeper实现的分布式锁支持可重入,Redis实现的分布式锁不支持可重入,这是错误的观点。两者都可以在客户端实现可重入逻辑。

    Zookeeper分布式锁源码分析

    curator 分布式锁的基本使用

    curator 对于锁这块做了一些封装,curator 提供了InterProcessMutex 这样一个 api。除了分布式锁之外,还提供了 leader 选举、分布式队列等常用的功能。

    • InterProcessMutex:分布式可重入排它锁
    • InterProcessSemaphoreMutex:分布式排它锁
    • InterProcessReadWriteLock:分布式读写锁
    
    public class LockDemo {
    
        private static String CONNECTION_STR = "192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
    
        public static void main(String[] args) throws Exception {
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
                    connectString(CONNECTION_STR).sessionTimeoutMs(5000).
                    retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            curatorFramework.start();
    
            final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + "->尝试竞争锁");
                    try {
                        lock.acquire(); //阻塞竞争锁
    
                        System.out.println(Thread.currentThread().getName() + "->成功获得了锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        Thread.sleep(4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release(); //释放锁
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }, "Thread-" + i).start();
            }
        }
    }
    

    Curator 实现分布式锁的基本原理

    我们先打开InterProcessMutex构造函数看看源码

       // 最常用
        public InterProcessMutex(CuratorFramework client,
                                 String path){
            // Zookeeper 利用 path 创建临时顺序节点,实现公平锁的核心
            this(client, path, new StandardLockInternalsDriver());
        }
        public InterProcessMutex(CuratorFramework client,
                                 String path, LockInternalsDriver driver){
            // maxLeases=1,表示可以获得分布式锁的线程数量(跨 JVM)为 1,即为互斥锁
            this(client, path, LOCK_NAME, 1, driver);
        }
    
        // protected 构造函数
        InterProcessMutex(CuratorFramework client, String
                path, String lockName, int maxLeases,
                          LockInternalsDriver driver){
            basePath = PathUtils.validatePath(path);
            // internals 的类型为 LockInternals ,
            InterProcessMutex 将分布式锁的申请和释放操作委托给internals 执行
            internals = new LockInternals(client, driver, path,
                    lockName, maxLeases);
        }
    
    

    再跟进InterProcessMutex.acquire方法

        // 无限等待
        public void acquire() throws Exception {
            if (!internalLock(-1, null)) {
                throw new IOException("Lost connection while trying to acquire lock:" + basePath);
            }
        }
    
        // 限时等待
        public boolean acquire(long time, TimeUnit unit)
                throws Exception {
            return internalLock(time, unit);
        }
    

    InterProcessMutex.internalLock

     private boolean internalLock(long time, TimeUnit unit) throws Exception {
            Thread currentThread =
                    Thread.currentThread();
            LockData lockData =
                    threadData.get(currentThread);
            if (lockData != null) {
                // 实现可重入
                // 同一线程再次 acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则原子 + 1,然后返回
                lockData.lockCount.incrementAndGet();
                return true;
            }
            // 映射表内没有对应的锁信息,尝试通过LockInternals 获取锁
            String lockPath = internals.attemptLock(time,unit, getLockNodeBytes());
            if (lockPath != null) {
                // 成功获取锁,记录信息到映射表
                LockData newLockData = new
                        LockData(currentThread, lockPath);
                threadData.put(currentThread,
                        newLockData);
                return true;
            }
            return false;
        }
    
        // 映射表
        // 记录线程与锁信息的映射关系
        private final ConcurrentMap<Thread, LockData>  threadData = Maps.newConcurrentMap();
        // 锁信息
        // Zookeeper 中一个临时顺序节点对应一个“锁”,但让锁生效激活需要排队(公平锁),下面会继续分析
    
        private static class LockData {
            final Thread owningThread;
            final String lockPath;
            final AtomicInteger lockCount = new
                    AtomicInteger(1); // 分布式锁重入次数
    
            private LockData(Thread owningThread,
                             String lockPath) {
                this.owningThread = owningThread;
                this.lockPath = lockPath;
            }
        }
    

    LockInternals.attemptLock

     // 尝试获取锁,并返回锁对应的 Zookeeper 临时顺序节点的路径
        String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
            final long startMillis = System.currentTimeMillis();
            // 无限等待时,millisToWait 为 null
            final Long millisToWait = (unit != null) ?
                    unit.toMillis(time) : null;
            // 创建 ZNode 节点时的数据内容,无关紧要,这里为 null,采用默认值(IP 地址)
            final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
            // 当前已经重试次数,与CuratorFramework的重试策略有关
            int retryCount = 0;
            // 在 Zookeeper 中创建的临时顺序节点的路径,相当于一把待激活的分布式锁
            // 激活条件:同级目录子节点,名称排序最小(排队,公平锁),后续继续分析
            String ourPath = null;
            // 是否已经持有分布式锁
            boolean hasTheLock = false;
            // 是否已经完成尝试获取分布式锁的操作
            boolean isDone = false;
            while (!isDone) {
                isDone = true;
                try {
                    // 从 InterProcessMutex 的构造函数可知实际 driver 为 StandardLockInternalsDriver 的实例
                    // 在Zookeeper中创建临时顺序节点
                    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                    // 循环等待来激活分布式锁,实现锁的公平性
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                } catch
                (KeeperException.NoNodeException e) {
                    // 容错处理,不影响主逻辑的理解,可跳过
                    // 因 为 会 话 过 期 等 原 因 ,StandardLockInternalsDriver 因为无法找到创建的临时 顺序节点而抛出 NoNodeException 异常
                    if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
                            System.currentTimeMillis() -
                                    startMillis, RetryLoop.getDefaultRetrySleeper())) {
                        // 满足重试策略尝试重新获取锁
                        isDone = false;
                    } else {
                        // 不满足重试策略则继续抛出NoNodeException
                        throw e;
                    }
                }
            }
            if (hasTheLock) {
                // 成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入
                return ourPath;
            }
            // 获取分布式锁失败,返回 null
            return null;
        }
    

    createsTheLock

      // From StandardLockInternalsDriver
        // 在 Zookeeper 中创建临时顺序节点
        public String createsTheLock(CuratorFramework
                                             client, String path, byte[] lockNodeBytes) throws
                Exception {
            String ourPath;
            // lockNodeBytes 不为 null 则作为数据节点内容,否则采用默认内容(IP 地址)
            if (lockNodeBytes != null) {
                // 下面对 CuratorFramework 的一些细节做解释,不影响对分布式锁主逻辑的解释,可跳过
                // creatingParentContainersIfNeeded:用于创建父节点,如果不支持 CreateMode.CONTAINER
                // 那么将采用 CreateMode.PERSISTENT
                // withProtection:临时子节点会添加GUID前缀
                ourPath = client.create().creatingParentContainersIfNeeded()
                        //CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper 能保证在节点产生的顺序性
                        // 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性,后续继续分析
                        .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
            } else {
                ourPath =
                        client.create().creatingParentContainersIfNeeded()
                                .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
            }
            return ourPath;
        }
    

    LockInternals.internalLockLoop

     // 循环等待来激活分布式锁,实现锁的公平性
        private boolean internalLockLoop(long startMillis,
                                         Long millisToWait, String ourPath) throws Exception {
            // 是否已经持有分布式锁
            boolean haveTheLock = false;
            // 是否需要删除子节点
            boolean doDelete = false;
            try {
                if (revocable.get() != null) {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
                while ((client.getState() ==
                        CuratorFrameworkState.STARTED) && !haveTheLock) {
                    // 获取排序后的子节点列表
                    List<String> children = getSortedChildren();
                    // 获取前面自己创建的临时顺序子节点的名称
                    String sequenceNodeName = ourPath.substring(basePath.length() + 1);
                    // 实现锁的公平性的核心逻辑,看下面的分析
                    PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if (predicateResults.getsTheLock()) {
                        // 获得了锁,中断循环,继续返回上层
                        haveTheLock = true;
                    } else {
                        // 没有获得到锁,监听上一临时顺序节点
                        String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                        synchronized (this) {
                            try {
                                // exists()会导致导致资源泄漏,因此 exists () 可以监听不存在的 ZNode,因此采用 getData ()
                                // 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下能直接获得锁,因为锁是公平的
    
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if (millisToWait != null) {
                                    millisToWait -=
                                            (System.currentTimeMillis() - startMillis);
                                    startMillis =
                                            System.currentTimeMillis();
                                    if (millisToWait <=
                                            0) {
                                        doDelete =
                                                true; // 获取锁超时,标记删除之前创建的临时顺序节点
                                        break;
                                    }
                                    wait(millisToWait);
                                    // 等待被唤醒,限时等待
                                } else {
                                    wait(); // 等待被唤醒,无限等待
                                }
                            } catch
                            (KeeperException.NoNodeException e) {
                                // 容错处理,逻辑稍微有点绕,可跳过,不影响主逻辑的理解
                                // client.getData()可能调用时抛出 NoNodeException,原因可能是锁被释放或会话过期(连接丢失)等
                                // 这里并没有做任何处理,因为外层是 while 循环,再次执行 driver.getsTheLock 时会调用 validateOurIndex
                                // 此 时 会 抛 出NoNodeException,从而进入下面的 catch 和 finally 逻辑,重新抛出上层尝试重试获取锁并删除临时顺序节点
                            }
                        }
                    }
                }
            } catch (Exception e) {
                ThreadUtils.checkInterrupted(e);
                // 标记删除,在 finally 删除之前创建的临时顺序节点(后台不断尝试)
                doDelete = true;
                // 重新抛出,尝试重新获取锁
                throw e;
            } finally {
                if (doDelete) {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    

    getTheLock

    // From StandardLockInternalsDriver
        public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
                throws Exception {
            // 之前创建的临时顺序节点在排序后的子节点列表中的索引
            int ourIndex =
                    children.indexOf(sequenceNodeName);
            // 校验之前创建的临时顺序节点是否有效
            validateOurIndex(sequenceNodeName,
                    ourIndex);
            // 锁公平性的核心逻辑
            // 由 InterProcessMutex 的构造函数可知, maxLeases 为 1,即只有 ourIndex 为 0 时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁
            // Zookeeper 的临时顺序节点特性能保证跨多个 JVM 的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁
            boolean getsTheLock = ourIndex <
                    maxLeases;
            // 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex - 1)
            // 因 为 锁 是 公 平 的 , 因 此 无 需 监 听 除 了(ourIndex - 1)以外的所有节点,这是为了减少羊群效应, 非常巧妙的设计!!
            String pathToWatch = getsTheLock ? null :
                    children.get(ourIndex - maxLeases);
            // 返回获取锁的结果,交由上层继续处理(添加监听等操作)
            return new PredicateResults(pathToWatch,
                    getsTheLock);
        }
    
        static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
            if (ourIndex < 0) {
                // 容错处理,可跳过
                // 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被 Zookeeper 服务端删除,往外抛出 NoNodeException
                // 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点
                // 佩服 Curator 的作者将边界条件考虑得 如此周到!
                throw new KeeperException.NoNodeException("Sequential path  not found:" + sequenceNodeName);
            }
        }
    

    释放锁的逻辑

    InterProcessMutex.release

     public void release() throws Exception {
            Thread currentThread = Thread.currentThread();
            LockData lockData = threadData.get(currentThread);
            if (lockData == null) {
                // 无法从映射表中获取锁信息,不持有锁
                throw new IllegalMonitorStateException("You do not own the lock:" + basePath);
            }
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount > 0) {
                // 锁是可重入的,初始值为 1,原子-1 到0,锁才释放
                return;
            }
            if (newLockCount < 0) {
                // 理论上无法执行该路径
                throw new IllegalMonitorStateException("Lock count has gonenegative for lock:" + basePath);
            }
            try {
                // lockData != null && newLockCount == 0,释放锁资源
                internals.releaseLock(lockData.lockPath);
            } finally {
                // 最后从映射表中移除当前线程的锁信息
                threadData.remove(currentThread);
            }
        }
    

    LockInternals.releaseLock

    void releaseLock(String lockPath) throws Exception {
            revocable.set(null);
            // 删除临时顺序节点,只会触发后一顺序节点去 获取锁,理论上不存在竞争,只排队,非抢占,公平锁,先到先得
            deleteOurPath(lockPath);
        }
    
        // Class:LockInternals
        private void deleteOurPath(String ourPath) throws Exception {
            try {
                // 后台不断尝试删除
                client.delete().guaranteed().forPath(ourPath);
            } catch (KeeperException.NoNodeException e) {
                // 已经删除(可能会话过期导致),不做处理
                // 实际使用 Curator-2.12.0 时,并不会抛出该异常
            }
        }
    ```s
    
    
      ——学自咕泡学院
    

    原文

    https://blog.csdn.net/kongmin_123/article/details/82081953

    https://www.jianshu.com/p/068db07caebd

    相关文章

      网友评论

          本文标题:Zookeeper 实现分布式原理及源码分析

          本文链接:https://www.haomeiwen.com/subject/tvgofhtx.html