美文网首页
[Zookeeper] 客户端Curator

[Zookeeper] 客户端Curator

作者: LZhan | 来源:发表于2020-02-23 11:40 被阅读0次

    1 Zookeeper原生客户端不足

    • 在连接zk超时的时候,不支持自动重连,需要手动重连
    • Watch注册一次就会失效,需要反复注册
    • 不支持递归创建节点
    • Curator能提供更多方案并且实现简答,例如分布式锁

    2 会话创建

    1. 使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端,如builder().build()或者newClient(),返回CuratorFramework对象

    2. 通过调用CuratorFramework中的start()方法来启动会话

    CuratorFramework curatorFramework= CuratorFrameworkFactory
                    .newClient("",new ExponentialBackoffRetry(1000,3));
            
    curatorFramework.start();
    
    CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectInfo)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    
    client.start();
    

    3 节点操作

    // 创建数据节点/home,内容为test
    client.create().forPath("/home","test".getBytes());
    // 更新数据节点数据
    client.setData().forPath("/home","test1".getBytes());
    // 获取数据节点信息
    client.getData().forPath("/home");
    // 获取数据节点权限
    client.getACL().forPath("/home");
    // 更新数据节点权限client.setACL().withACL(list).forPath("/home");
    // 删除数据节点/home
    client.delete().forPath("/home");
    

    4 代码演示

    4.1 添加pom依赖

            <!--对zookeeper的底层api的一些封装-->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <!--封装了一些高级特性,如:Cache时间监听,选举,分布式锁,分布式barrier-->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>
    

    4.2 代码分析

    1. 连接重试策略
    • RetryForever:一直重试,直至连接成功,一般不使用
    • RetryNTime:指定重连的次数N
    • RetryUtilElapsed:指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者连接成功
    • ExponentialBackoffRetry:基于“backoff”方式重连,重连的时间间隔是动态的
    • BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重试次数的控制
    1. 默认Curator创建节点是永久节点,如果需要创建临时节点
    client.create().withMode(CreateMode.EPHEMERAL).forPath("/aa");
    

    一般执行完毕之后,session就会关闭了,创建的临时节点就不存在了。


    creatingParentsIfNeeded()方法的意思是如果父节点不存在,则在创建节点的同时创建父节点

    client.create().creatingParentsIfNeeded().forPath("/test");
    

    旧版本中 creatingParentContainersIfNeeded()等于creatingParentsIfNeeded()
    新版本中 creatingParentContainersIfNeeded()以容器模式递归创建节点


    删除节点时,如果当前节点还存在子节点,那么删除时会报错,如果要删除的话,就得加上deletingChildrenIfNeeded()

    client.delete().deletingChildrenIfNeeded().forPath("/test");
    

    在连接失败的情况下,可能删除失败,如果确定删除的,则加上guaranteed(),则在不停重试中,不停去删除节点。


    使用Stat,则利用storingStatIn

     Stat stat=new Stat();
    byte[] bytes1=client.getData().storingStatIn(stat).forPath("/aa/bb");
    System.out.println(new String(bytes1));
    System.out.println(stat.getCversion());
    

    5 分布式锁

    public class DistributedLockTest {
    
    
        public static void main(String[] args) throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory
                    .builder()
                    .connectString("localhost:2181")
                    .connectionTimeoutMs(5000)
                    .sessionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    
            client.start();
    
            String path = "/orderNo";
            InterProcessMutex mutex = new InterProcessMutex(client, path);
    
    
            Stat stat = client.checkExists().forPath(path);
            if (stat == null) {
                client.create().forPath(path, "1000".getBytes());
            } else {
                client.setData().forPath(path, "1000".getBytes());
            }
    
            for (int i = 0; i < 10; i++) {
    
                new Thread(new Runnable() {
                    @Override
                    public void run() {
    
                        try {
                            mutex.acquire();
    
                            String numStr = new String(client.getData().forPath(path));
                            int orderNum = Integer.parseInt(numStr);
                            String currentNum = orderNum + 1 + "";
                            client.setData().forPath(path, currentNum.getBytes());
    
                            System.out.println("currentNum = " + new String(client.getData().forPath(path)));
    
                            mutex.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                
            }
        }
    }
    

    源码分析:

    1. 初始化InterProcessMutex
    1. 调用InterProcessMutexacquire方法

    实际上调用的是internalLock方法
    internalLock --》LockInternalsattemptLock --》createsTheLockinternalLockLoop

    StandardLockInternalsDrivercreatesTheLock方法:创建临时顺序节点

        @Override
        public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
        {
            String ourPath;
            if ( lockNodeBytes != null )
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
            }
            else
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
            }
            return ourPath;
        }
    

    相关文章

      网友评论

          本文标题:[Zookeeper] 客户端Curator

          本文链接:https://www.haomeiwen.com/subject/roicqhtx.html