美文网首页
Zookeeper 客户端 Curator

Zookeeper 客户端 Curator

作者: zfylin | 来源:发表于2019-06-18 09:26 被阅读0次

Zookeeper 客户端 Curator

概述

CuratorNetflix公司开源的一套Zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册WatcherNodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

Curator包含了几个包:

  • curator-framework:对Zookeeper的底层API的一些封装。
  • curator-client:提供一些客户端的操作,例如重试策略等。
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

客户端

创建

静态方法创建

// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

Fluent API 创建

// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
        .connectString(connectionInfo)
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retryPolicy)
        .build();

创建包含命名空间的客户端

// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 命名空间,即该客户端可操作目录的根目录
                .build();

启动

client.start();

节点

Zookeeper 节点类型

  • PERSISTENT: 持久化
  • PERSISTENT_SEQUENTIAL: 持久化并且带序列号
  • EPHEMERAL: 临时
  • EPHEMERAL_SEQUENTIAL: 临时并且带序列号

创建

// 创建节点,内容为空
client.create().forPath("path");
// 创建节点,内容为“init”
client.create().forPath("path","init".getBytes());
// 创建临时节点,内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
// 创建临时节点,内容为“init”
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
// 创建临时节点,内容为“init”,递归创建父节点
client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

删除

// 删除节点
client.delete().forPath("path");
// 删除节点,递归删除子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
// 删除制定版本节点
client.delete().withVersion(10086).forPath("path");
// 强制删除节点
client.delete().guaranteed().forPath("path");

读取

// 读取节点内容
client.getData().forPath("path");
// 读取节点内容及状态
client.getData().storingStatIn(stat).forPath("path");

更新

// 更新节点
client.setData().forPath("path","data".getBytes());
// 指定版本更新节点
client.setData().withVersion(10086).forPath("path","data".getBytes());

节点是否存在

client.checkExists().forPath("path");

获取所有子节点

client.getChildren().forPath("path");

事务

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

异步接口

Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE create()
DELETE delete()
EXISTS checkExists()
GET_DATA getData()
SET_DATA setData()
CHILDREN getChildren()
SYNC sync(String,Object)
GET_ACL getACL()
SET_ACL setACL()
WATCHED Watcher(Watcher)
CLOSING close()

响应码

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期
Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)  //如果inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理
      .forPath("path");

缓存

强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。

Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

Path Cache

Path Cache用来监控一个ZNode的子节点。当一个子节点增加,更新,删除时,Path Cache会改变它的状态, 会包含最新的子节点,子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

实际使用时会涉及到四个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必须调用它的start方法,使用完后调用close方法。 可以设置StartMode来实现启动的模式。

StartMode有下面几种:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()
  3. POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener) 可以增加listener监听缓存的变化。

getCurrentData()方法返回一个List<ChildData>对象,可以遍历所有的子节点。

设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {

        // 创建客户端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 启动
        client.start();

        // cache
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        // cache start
        cache.start();
        // cache listener
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);

        // test
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
        ThreadUtil.sleep(10);
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
        ThreadUtil.sleep(10);
        client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
        ThreadUtil.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath(PATH + "/test01");
        ThreadUtil.sleep(10);
        client.delete().forPath(PATH + "/test02");
        ThreadUtil.sleep(10);
        
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。

注意:示例中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!

Node Cache

Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:

  • NodeCache - Node Cache实现类
  • NodeCacheListener - 节点监听器
  • ChildData - 节点数据

注意:使用cache,依然要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/nodeCache";

    public static void main(String[] args) throws Exception {

        // 创建客户端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 启动
        client.start();

        client.create().creatingParentsIfNeeded().forPath(PATH);

        // cache
        NodeCache cache = new NodeCache(client, PATH);
        // cache start
        cache.start();
        // cache listener
        NodeCacheListener cacheListener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(cacheListener);

        // test
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(10);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(10);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(10);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!

注意:NodeCache只能监听一个节点的状态变化。

Tree Cache

Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:

  • TreeCache - Tree Cache实现类
  • TreeCacheListener - 监听器类
  • TreeCacheEvent - 触发的事件类
  • ChildData - 节点数据
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {

        // 创建客户端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 启动
        client.start();

        client.create().creatingParentsIfNeeded().forPath(PATH);

        // cache
        TreeCache cache = new TreeCache(client, PATH);
        // cache start
        cache.start();
        // cache listener
        TreeCacheListener cacheListener = (client1, event) -> System.out.println("事件类型:" + event.getType() +
                " | 路径:" + (null != event.getData() ? event.getData().getPath() : null) +
                " | 数据:" + (null != event.getData() ? new String(event.getData().getData()) : null));

        cache.getListenable().addListener(cacheListener);

        // test
        client.setData().forPath(PATH, "01".getBytes());
//        Thread.sleep(10);
        client.setData().forPath(PATH, "02".getBytes());
//        Thread.sleep(10);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
//        Thread.sleep(10);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中没有使用Thread.sleep(10),但是事件触发次数也是正常的。

注意:TreeCache在初始化(调用start()方法)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针异常,这里应该主动处理并避免这种情况。

选举

分布式锁

分布式队列

分布式计数器

参考

zookeeper-curator-usage
curator.apache.org

相关文章

网友评论

      本文标题:Zookeeper 客户端 Curator

      本文链接:https://www.haomeiwen.com/subject/xfrxqctx.html