美文网首页
5分钟学习zookeeper:使用Curator事半功倍

5分钟学习zookeeper:使用Curator事半功倍

作者: JerrysCode | 来源:发表于2020-12-27 22:55 被阅读0次

    Apache Curator被称为zookeeper客户端开发的瑞士军刀。该框架把zookeeper原生的api封装为highlevel的api,使操作zookeeper更加高效便捷。

    Patrixck Hunt(Zookeeper commiter)认为Curator对zookeeper的价值就像Guava对java的价值。


    ph-quote.png

    Curator优点

    1. Curator封装了zookeeper的原生api,帮程序员屏蔽了底层细节,如果断链重连,反复注册Watcher,NodeExistsException等,极大提高了开发效率
    2. Curator支持事务,一个事务中的操作要么全部成功,要么全部失败
    3. Curator针对zookeeper的常见应用场景(如leader选举,分布式锁等)提供了代码框架(Recipes)。

    基本操作代码示例

    1. 引入jar包
      这里通过maven引入最新版本的curator-5.0.0
        <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-recipes</artifactId>
          <version>5.0.0</version>
        </dependency>
    
    1. 创建会话
    //重试策略,每隔1000ms重试一次,总共重试10次
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")   //zookeeper服务器地址
                    .retryPolicy(retryPolicy)   //重试策略
                    .connectionTimeoutMs(30 * 1000)  //连接超时时间
                    .sessionTimeoutMs(3 * 1000)   //会话超时时间
                    .build();
    
    client.start();
    
    1. 创建节点
      创建空节点是可以不指定节点的值,也可以指定节点值
    client.create().forPath("/myNodeWithoutData");
    client.create().forPath("/myPermanentNode", "192.168.1.70".getBytes());
    client.create().withMode(CreateMode.EPHEMERAL).forPath("/myTempNode", "192.168.10.20".getBytes());
    client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/mySequentialNode", "192.168.10.21".getBytes());
    
    1. 设置节点
    client.setData().forPath("/myNodeWithoutData", "192.168.0.50".getBytes());
    client.create().orSetData().creatingParentContainersIfNeeded().forPath("/myParentNode/myChildNode_1", "child-1".getBytes());
     client.create().orSetData().creatingParentContainersIfNeeded().forPath("/myParentNode/myChildNode_2", "child-2".getBytes());
    
    1. 获取节点存储的值
    String data = new String(client.getData().forPath("/myPermanentNode"));
    System.out.println(data);
    
    List<String> children = client.getChildren().forPath("/");
    children.stream().forEach(System.out::println);
    
    1. 判断节点是否存在
    Stat stat = client.checkExists().forPath("/myNodeWithoutData");
    System.out.println(null == stat ? "exists" : "not exists");
    
    1. 删除节点
      如果要删除一个包含字节的节点,可以使用deletingChildrenIfNeeded方法
    client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/myNode");
    

    事务操作

    如下示例代码中的createOp,setDataOp,deleteOp位于一个事务中,这个三个操作要么同时成功生效,要么同时失败回滚

    CuratorOp createOp = client.transactionOp().create().forPath("/myNode");
    CuratorOp setDataOp = client.transactionOp().setData().forPath("/myNode", "192.168.0.1".getBytes());
    CuratorOp deleteOp = client.transactionOp().delete().forPath("/AAA");
    
    List<CuratorTransactionResult> result = client.transaction().forOperations(createOp, setDataOp, deleteOp);
    result.stream().forEach(rt -> System.out.println(rt.getForPath() + "---" + rt.getType()));
    

    节点事件监听

    Curator框架通过CuratorCache缓存程序指定的节点到本地,并在数据变化时通知Listener。
    CuratorCacheListener监听节点及其所有子节点的创建、删除以及数据更新

    try(CuratorCache cache = CuratorCache.builder(client, "/node/child").build()){
                CuratorCacheListener listener = CuratorCacheListener.builder()
                        .forCreates(childData -> System.out.println(String.format("creating node %s", childData)))
                        .forChanges((oldNode, newNode) -> System.out.println(String.format("change %s to %s", oldNode, newNode)))
                        .forDeletes(node -> System.out.println(String.format("deleted node %s", node)))
                        .forInitialized(() -> System.out.println("Initialized"))
                        .build();
    
                cache.listenable().addListener(listener, pool);
                cache.start();
    }
    

    相关文章

      网友评论

          本文标题:5分钟学习zookeeper:使用Curator事半功倍

          本文链接:https://www.haomeiwen.com/subject/cuyanktx.html