美文网首页
超级简单的分布式锁实现-zookeeper

超级简单的分布式锁实现-zookeeper

作者: DoubleFooker | 来源:发表于2019-10-08 07:56 被阅读0次

    使用zookeeper实现

    使用zookeeper实现分布式下锁的抢占。

    使用curator实现

    添加maven依赖

     <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.12</version>
            </dependency>
    

    需要注意的是4.0.0版本依赖zk3.5.3-beta版本,如果运行的zk不是这个版本需要引入对应兼容的版本。

    public class DistributedLockUtil {
        // 集群地址
        public static final String ZOOKEEPER_STRING = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        // 可重入锁实现类
        private InterProcessMutex lock;
        // 锁的路径
        private String lockPath = "/locks";
    
        public DistributedLockUtil(CuratorFramework client) {
            this.lock = new InterProcessMutex(client, lockPath);
        }
    
        public InterProcessMutex getLock() {
            return lock;
        }
    
        public static void main(String[] args) {
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZOOKEEPER_STRING,
                    10000,// session断开保留时长
                    10000,// 链接超时
                    new RetryForever(10000)// 重试机制
            );
            curatorFramework.start();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    DistributedLockUtil distributedLockUtil = new DistributedLockUtil(curatorFramework);
                    InterProcessMutex lock = distributedLockUtil.getLock();
                    try {
                        lock.acquire();
                        System.out.println(Thread.currentThread().getName() + "-获得锁!");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }, "Thread:" + i).start();
                countDownLatch.countDown();
    
            }
    
        }
    }
    

    curator提供InterProcessMutex实现分布式可重入锁的机制。
    接下来解读下使用Zookeeper实现分布式锁的常见逻辑。

    ZK的节点

    zookeeper数据以树状节点的形式存储,并且相同目录下节点不糊会重复,其中节点类型分为

    • 持久节点:客户端断开链接节点不会被删除
    • 持久有序节点:客户端断开链接节点不会被删除,且创建的节点有顺序序号。
    • 临时节点:客户端断开链接节点被删除
    • 临时有序节点:客户端断开链接节点被删除,且创建的节点有顺序序号。

    watcher机制

    客户端注册监听它关心的目录节点,包括数据变更、删除、新增、子节点变化等,zookeeper会触发通知客户端。并且每个监听的watcher只会触发一次。

    公平锁

    基于ZK几点的特性,以及watcher机制,能够很方便的实现公平锁的竞争。如下:
    客户端在父节点/locks下创建临时有序节点,获取/locks节点下所有子节点,判断自己是否为序号最小节点,如果是最小序号,则获得锁,如果不是,则监听比自己节点小一号的节点,添加删除的watcher事件。当获得锁的客户端释放锁或则断开链接,则会触发它的下一个节点的watcher事件,让下一个节点获得区锁。
    由于watcher事件是一次性的,如果当前客户端的上一个节点,还没获得锁就已经断开链接,这时也会触发watcher事件,而这时并不是锁释放的型号,所以代码逻辑里需要在watcher事件处理里再次判断当前节点是否为最小节点,如果不是继续监听比自己小的上一个节点。这样子就形成了一个按客户端创建节点顺序排列的队列,实现锁的公平竞争。
    惊群效应
    这里有一点需要注意下,如果我们所有为获得锁的客户端的watcher事件监听的是/locks节点的子节点的变化事件。那么如果这时候有上千个客户端在等待获得锁,那么一档持有锁的客户端释放锁,所有等待的客户端都会触发watcher事件,这显然是不合理的,这时并不是所有的客户端都需要重新发起锁的获取。这就是所谓的惊群效应。
    我们可以跟踪下curatorInterProcessMutex实现原理,其逻辑就是公平锁的处理思想,主要逻辑在LockInternals#internalLockLoop

    // 上一步已创建临时节点,ourPath
     private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        {
            boolean     haveTheLock = false;
            boolean     doDelete = false;
            try
            {
                if ( revocable.get() != null )
                {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
    
                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    //获得有序的所有最节点信息
                    List<String>        children = getSortedChildren();
    // 当前客户端序号
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    // 判断是否为最小节点,是则获得锁,否则监听
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        haveTheLock = true;
                    }
                    else
                    {
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                        synchronized(this)
                        {
                            try 
                            {
                                //监听比自己小的上一个节点的事件
    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if ( millisToWait != null )
                                {
                                    millisToWait -= (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                    if ( millisToWait <= 0 )
                                    {
                                        doDelete = true;    // timed out - delete our node
                                        break;
                                    }
    
                                    wait(millisToWait);
                                }
                                else
                                {
    // 等待,当watcher事件触发时唤醒,继续while判断
                                    wait();
                                }
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
            }
            catch ( Exception e )
            {
    // 异常释放锁
                ThreadUtils.checkInterrupted(e);
                doDelete = true;
                throw e;
            }
            finally
            {
                if ( doDelete )
                {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    

    代码比较建档,看下注释可以跟踪代码的实现

    非公平锁

    上线讲到的是公平锁竞争的实现,那么如何实现非公平锁的竞争呢。其实很简单,我们只需要稍微改变下思路即可。首先还是获得锁的节点还是临时节点,但是非公平锁只有一个节点/lock。所有客户端都尝试创建这个/lock节点,如果创建失败,即节点已经存在。则添加/lock节点的watcher事件,当触发watcher事件时,所有需要竞争锁的客户端,再次尝试创建节点。这样就是先了锁的非公平竞争。

    相关文章

      网友评论

          本文标题:超级简单的分布式锁实现-zookeeper

          本文链接:https://www.haomeiwen.com/subject/kuwmpctx.html