美文网首页
超级简单的分布式锁实现-zookeeper

超级简单的分布式锁实现-zookeeper

作者: DoubleFooker | 来源:发表于2019-10-08 07:56 被阅读0次

使用zookeeper实现

使用zookeeper实现分布式下锁的抢占。

使用curator实现

添加maven依赖

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>

需要注意的是4.0.0版本依赖zk3.5.3-beta版本,如果运行的zk不是这个版本需要引入对应兼容的版本。

public class DistributedLockUtil {
    // 集群地址
    public static final String ZOOKEEPER_STRING = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    // 可重入锁实现类
    private InterProcessMutex lock;
    // 锁的路径
    private String lockPath = "/locks";

    public DistributedLockUtil(CuratorFramework client) {
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public InterProcessMutex getLock() {
        return lock;
    }

    public static void main(String[] args) {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(ZOOKEEPER_STRING,
                10000,// session断开保留时长
                10000,// 链接超时
                new RetryForever(10000)// 重试机制
        );
        curatorFramework.start();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                DistributedLockUtil distributedLockUtil = new DistributedLockUtil(curatorFramework);
                InterProcessMutex lock = distributedLockUtil.getLock();
                try {
                    lock.acquire();
                    System.out.println(Thread.currentThread().getName() + "-获得锁!");
                    TimeUnit.SECONDS.sleep(10);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "Thread:" + i).start();
            countDownLatch.countDown();

        }

    }
}

curator提供InterProcessMutex实现分布式可重入锁的机制。
接下来解读下使用Zookeeper实现分布式锁的常见逻辑。

ZK的节点

zookeeper数据以树状节点的形式存储,并且相同目录下节点不糊会重复,其中节点类型分为

  • 持久节点:客户端断开链接节点不会被删除
  • 持久有序节点:客户端断开链接节点不会被删除,且创建的节点有顺序序号。
  • 临时节点:客户端断开链接节点被删除
  • 临时有序节点:客户端断开链接节点被删除,且创建的节点有顺序序号。

watcher机制

客户端注册监听它关心的目录节点,包括数据变更、删除、新增、子节点变化等,zookeeper会触发通知客户端。并且每个监听的watcher只会触发一次。

公平锁

基于ZK几点的特性,以及watcher机制,能够很方便的实现公平锁的竞争。如下:
客户端在父节点/locks下创建临时有序节点,获取/locks节点下所有子节点,判断自己是否为序号最小节点,如果是最小序号,则获得锁,如果不是,则监听比自己节点小一号的节点,添加删除的watcher事件。当获得锁的客户端释放锁或则断开链接,则会触发它的下一个节点的watcher事件,让下一个节点获得区锁。
由于watcher事件是一次性的,如果当前客户端的上一个节点,还没获得锁就已经断开链接,这时也会触发watcher事件,而这时并不是锁释放的型号,所以代码逻辑里需要在watcher事件处理里再次判断当前节点是否为最小节点,如果不是继续监听比自己小的上一个节点。这样子就形成了一个按客户端创建节点顺序排列的队列,实现锁的公平竞争。
惊群效应
这里有一点需要注意下,如果我们所有为获得锁的客户端的watcher事件监听的是/locks节点的子节点的变化事件。那么如果这时候有上千个客户端在等待获得锁,那么一档持有锁的客户端释放锁,所有等待的客户端都会触发watcher事件,这显然是不合理的,这时并不是所有的客户端都需要重新发起锁的获取。这就是所谓的惊群效应。
我们可以跟踪下curatorInterProcessMutex实现原理,其逻辑就是公平锁的处理思想,主要逻辑在LockInternals#internalLockLoop

// 上一步已创建临时节点,ourPath
 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //获得有序的所有最节点信息
                List<String>        children = getSortedChildren();
// 当前客户端序号
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断是否为最小节点,是则获得锁,否则监听
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            //监听比自己小的上一个节点的事件
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
// 等待,当watcher事件触发时唤醒,继续while判断
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
// 异常释放锁
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

代码比较建档,看下注释可以跟踪代码的实现

非公平锁

上线讲到的是公平锁竞争的实现,那么如何实现非公平锁的竞争呢。其实很简单,我们只需要稍微改变下思路即可。首先还是获得锁的节点还是临时节点,但是非公平锁只有一个节点/lock。所有客户端都尝试创建这个/lock节点,如果创建失败,即节点已经存在。则添加/lock节点的watcher事件,当触发watcher事件时,所有需要竞争锁的客户端,再次尝试创建节点。这样就是先了锁的非公平竞争。

相关文章

网友评论

      本文标题:超级简单的分布式锁实现-zookeeper

      本文链接:https://www.haomeiwen.com/subject/kuwmpctx.html