  • curator-framework:对zookeeper的底层api的一些封装。
  • curator-client:提供一些客户端的操作,例如重试策略等。
  • curator-recipes:封装了一些高级特性,例如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。


        <!-- zookeeper -->
        <!-- 对zookeeper的底层api的一些封装 -->
        <!-- 提供一些客户端的操作,例如重试策略等 -->
        <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等 -->

一 Curator的基本用法

1.1 创建zookeeper客户端


CuratorFramework api介绍如下

public interface CuratorFramework {

     * 启动zookeeper客户端
    public void start();

     * 关闭zookeeper客户端
    public void close();

    public CuratorFrameworkState getState();

     * 客户端是否已经启动
    public boolean isStarted();

     * 创建节点的建造器
    public CreateBuilder create();

     * 删除节点的建造器
    public DeleteBuilder delete();

     * 检查节点是否存在的建造器
    public ExistsBuilder checkExists();

     * 获取接连数据的建造器
    public GetDataBuilder getData();

     * 设置节点数据的建造器
    public SetDataBuilder setData();

     * 获取子节点的建造器
    public GetChildrenBuilder getChildren();

     * 获取权限的构造器
    public GetACLBuilder getACL();

     * 设置权限的构造器
    public SetACLBuilder setACL();

     * 重新配置的建造器
    public ReconfigBuilder reconfig();

     * 获取配置的建造器
    public GetConfigBuilder getConfig();

     * 事务构造器
     * @deprecated use {@link #transaction()} instead
    public CuratorTransaction inTransaction();

     * 事务构造器
    public CuratorMultiTransaction transaction();

     * 分配可与{transaction()}一起使用的操作
    public TransactionOp transactionOp();

     * 如果路径不存在,则创建路径对应的节点
    public void createContainers(String path) throws Exception;

     * 启动同步构建器。注意:即使您不使用其中一种background()方法,同步也始终在后台
    public SyncBuilder sync();

     * 启动remove watch builder,有节点删除的时候会调用
    public RemoveWatchesBuilder watches();

     * 返回Connect State的可侦听接口
    public Listenable<ConnectionStateListener> getConnectionStateListenable();

     * 返回事件的可侦听接口
    public Listenable<CuratorListener> getCuratorListenable();

     * 返回未处理错误的可侦听接口
    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();

     * 返回一个新的CuratorFramework,该CuratorFramework指定了一个新的命名空间
    public CuratorFramework usingNamespace(String newNamespace);

     * 获取命名空间
    public String getNamespace();

     * 返回托管的zookeeper客户端
    public CuratorZookeeperClient getZookeeperClient();

     * 阻塞,直到与ZooKeeper的连接可用或已超过maxWaitTime
    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;

     * 阻塞,直到与ZooKeeper的连接可用。在连接可用或中断之前,此方法不会返回,在这种情况下,将抛出InterruptedException
    public void blockUntilConnected() throws InterruptedException;

     * 返回跟踪观察者创建的当前实例的外观,并允许一次性删除所有观察者
    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();

     * 返回配置的错误策略
    public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();

     * Current维护Zookeeper仲裁配置的缓存视图。
    public QuorumVerifier getCurrentConfig();

     * 获取SchemaSet
    SchemaSet getSchemaSet();

     * 如果此实例在ZK 3.4.x兼容模式下运行,则返回true
    boolean isZk34CompatibilityMode();


CuratorFrameworkFactory api介绍如下

public class CuratorFrameworkFactory {

     * 用于通过建造者模式创建zookeeper客户端
    public static Builder builder();

     * 创建zookeeper客户端
    public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);

     * 创建zookeeper客户端
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);

     * 将本地地址作为可用作节点有效负载的字节返回
    public static byte[] getLocalAddress();

    public static class Builder {

         * build CuratorFramework对象 -- zookeeper客户端
        public CuratorFramework build();

         * 创建一个临时的CuratorFramework客户端,CuratorFramework,默认3分钟不活动客户端连接就被关闭
        public CuratorTempFramework buildTemp();

         * 创建一个临时的CuratorFramework客户端,CuratorFramework,可以自己指定多长时间不活动客户端连接就被关闭
        public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit);

         * 添加zookeeper 访问权限
        public Builder authorization(String scheme, byte[] auth);

        public Builder authorization(List<AuthInfo> authInfos);

         * 设置zookeeper服务器列表
        public Builder connectString(String connectString);

         * zookeeper服务器地址通过EnsembleProvider(配置提供者)来提供,不能和connectString共同使用
        public Builder ensembleProvider(EnsembleProvider ensembleProvider);

         * 为每次新建的节点设置一个默认值
        public Builder defaultData(byte[] defaultData);

         * 设置命名空间,为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
        public Builder namespace(String namespace)

         * 会话超时时间,单位毫秒,默认60000ms
        public Builder sessionTimeoutMs(int sessionTimeoutMs);

         * 连接创建超时时间,单位毫秒,默认60000ms
        public Builder connectionTimeoutMs(int connectionTimeoutMs);

         * @param maxCloseWaitMs time to wait during close to join background threads
         * @return this
        public Builder maxCloseWaitMs(int maxCloseWaitMs);

         * 设置客户端重连策略
        public Builder retryPolicy(RetryPolicy retryPolicy);

         * Executor Services的线程工厂
        public Builder threadFactory(ThreadFactory threadFactory);

         * 压缩器,用于压缩和解压数据
        public Builder compressionProvider(CompressionProvider compressionProvider);

         * ZookeeperFactory 用于创建ZooKeeper
        public Builder zookeeperFactory(ZookeeperFactory zookeeperFactory);

         * 权限控制器
        public Builder aclProvider(ACLProvider aclProvider);

         * 设置只读模式
        public Builder canBeReadOnly(boolean canBeReadOnly);

         * 不让客户端,创建节点的时候顺带创建父节点
        public Builder dontUseContainerParents();

         * 默认是StandardConnectionStateErrorPolicy,设置要使用的错误策略
        public Builder connectionStateErrorPolicy(ConnectionStateErrorPolicy connectionStateErrorPolicy);

         * 如果mode为true,则创建ZooKeeper 3.4.x兼容客户端。如果使用的客户端库是ZooKeeper 3.4.x 默认情况下已启用
        public Builder zk34CompatibilityMode(boolean mode);

         * 更改连接处理策略,默认StandardConnectionHandlingPolicy
        public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy);

         * 添加强制架构集
        public Builder schemaSet(SchemaSet schemaSet);

       从上面的CuratorFrameworkFactory api的介绍可以看出CuratorFrameworkFactory对象的创建有两种方式:

  • 通过过构造函数创建
参数 类型 含义
connectString String 服务器列表,格式host1:port1,host2:port2,…
sessionTimeoutMs int 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs int 连接创建超时时间,单位毫秒,默认60000ms
retryPolicy RetryPolicy 重试策略,curator已经提供了多种重试策略,也可以自行实现RetryPolicy接口


  • 通过build创建,关于build里面的各个参数在CuratorFrameworkFactory api里面都顺带介绍了哦。



        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()

1.2 启动客户端


1.3 节点操作


  • CreateMode.PERSISTENT:持久化节点。
  • CreateMode.PERSISTENT_SEQUENTIAL:持久化并且带序列号节点。
  • CreateMode.EPHEMERAL:临时节点(客户端断开了节点也就删除了)
  • CreateMode.EPHEMERAL_SEQUENTIAL:临时并且带序列号(客户端断开了节点也就删除了)

1.3.1 创建节点



     * 同步 创建持久化节点
     * @param path 节点路径
     * @throws Exception errors
    public void createPersistentNodeSync(String path) throws Exception {
                .creatingParentContainersIfNeeded() // 自动递归创建父节点


     * 同步-创建持久化节点
     * @param path 节点路径
     * @param data 节点对应的值
     * @throws Exception errors
    public void createPersistentNodeSync(String path, byte[] data) throws Exception {
                .creatingParentContainersIfNeeded() // 自动递归创建父节点
                .forPath(path, data);


1.3.2 删除节点


     * 同步-删除一个叶子节点(注意哦,只能删除叶子节点否则报错的)
     * @param path 需要删除的节点对应的路径
     * @throws Exception errors
    public void deleteNodeSync(String path) throws Exception {


     * 同步-删除一个节点,并且递归删除其所有的子节点
     * @param path 需要删除的节点对应的路基
     * @throws Exception errors
    public void deleteNodeRecursivelySync(String path) throws Exception {


1.3.3 判断节点是否存在


     * 同步-检查节点是否存在
     * @param path 节点路径
     * @return 节点是否存在
     * @throws Exception errors
    public boolean isNodeExistSync(String path) throws Exception {
        Stat state = client.checkExists()
        return state != null;

1.3.4 节点数据操作


     * 同步-读取一个节点的数据内容
     * @param path 节点路基
     * @return 节点内容
     * @throws Exception errors
    public byte[] getNodeDataSync(String path) throws Exception {
        return client.getData()


     * 同步-更新一个节点的数据内容
     * @param path 节点路径
     * @param data 节点对应数据
     * @throws Exception errors
    public void updateNodeDataSync(String path, byte[] data) throws Exception {
                .forPath(path, data);

1.3.5 获取节点的所有子节点

     * 同步-获取某个节点的所有子节点路径
     * @param path 目录
     * @return children
     * @throws Exception errors
    public List<String> getChildrenSync(String path) throws Exception {
        return client.getChildren()

1.4 事务



    public void transaction() throws Exception {
        CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
        CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
        CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");

        Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
        for (CuratorTransactionResult result : results) {
            System.out.println(result.getForPath() + " - " + result.getType());

1.5 异步操作



     * 异步-获取某个节点的所有子节点路径
     * @param path     目录
     * @param callback 回调
     * @throws Exception errors
    public void getChildrenAsync(String path, BackgroundCallback callback) throws Exception {

     * 异步-获取某个节点的所有子节点路径
     * @param path     目录
     * @param callback 回调
     * @param executor 回调在哪里执行
     * @throws Exception errors
    public void getChildrenAsync(String path, BackgroundCallback callback, Executor executor) throws Exception {
                .inBackground(callback, executor)


二 Curator高级特性

       Curator里面的curator-recipes ja包封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等等。而且这些特性都是在分布式系统里面常用的功能了。

2.1 Cache事件监听


2.1.1 Path Cache

       Path Cache用来监控子节点.当一个子节点增加, 更新,删除时, Path Cache会改变它的状态,会包含最新子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

       Path Cache的使用非常的简单,主要涉及到四个类:

  • PathChildrenCache:Path Cache听实现类
  • PathChildrenCacheEvent:子节点事件
  • PathChildrenCacheListener: 子节点监听
  • ChildData:子节点信息

       关于Path Cache的使用,我们用一个实例来简单的说明下,实例里面也只是简单的创建了一个节点。最终监听到节点的创建.

    public void pathChildrenCache() throws Exception {

        // 创建zookeeper客户端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
        // 启动客户端

        PathChildrenCache cache = new PathChildrenCache(client, "/tuacy/pathCache", true);
        // 添加监听
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件类型:" + event.getType());
                if (null != event.getData()) {
                    System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
        // 添加节点
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

2.1.2 Node Cache

       Node Cache与Path Cache类似,Node Cache只是监听某一指定的节点。子节点的变化它是不会管的。

       Node Cache的使用涉及到下面的三个类:

  • NodeCache - Node Cache实现类
  • NodeCacheListener - 节点监听器
  • ChildData - 节点数据


    public void nodeCache() throws Exception {

        // 创建zookeeper客户端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
        // 启动客户端

        final NodeCache cache = new NodeCache(client, "/tuacy/nodeCache");
        // 添加监听
        cache.getListenable().addListener(new NodeCacheListener() {

            public void nodeChanged() throws Exception {
                ChildData data = cache.getCurrentData();
                if (null != data) {
                    System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
                } else {
        // 添加节点
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        client.setData().forPath("/tuacy/nodeCache", "abc".getBytes());
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

2.1.3 Tree Cache

       Tree Cache可以监控整个树上的所有节点,就是PathCache和NodeCache的组合功能。

       Tree Cache的使用涉及到下面四个类。

  • TreeCache - Tree Cache实现类
  • TreeCacheListener - 监听器类
  • TreeCacheEvent - 触发的事件类
  • ChildData - 节点数据

       我们还是以具体的实例来说明Tree Cache的使用。

    public void nodeCache() throws Exception {

        // 创建zookeeper客户端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
        // 启动客户端
        final TreeCache cache = TreeCache.newBuilder(client, "/tuacy/treeCache")
        // 添加监听
        cache.getListenable().addListener(new TreeCacheListener() {

            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("事件类型:" + event.getType() + " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        // 添加节点
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 给节点设置数据
        client.setData().forPath("/tuacy/treeCache", "abc".getBytes());
        // 创建子节点
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 修改子节点的数据
        client.setData().forPath("/tuacy/treeCache/001", "abc".getBytes());
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 删除子节点
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 删除节点
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

2.2 Leader选举


  • 主节点是唯一的。
  • 各个节点获取主节点的概率是一样的,一旦某个节点被选为了主节点(Leader),其他的从节点(Slaver)也要能感知到。
  • 一旦主节点断开,其他的从节点重新选出一个主节点。

2.2.1 LeaderLatch

       在不同的zookeeper客户端,使用了相同latch path的LeaderLatch,当中的一个最终会被选举为leader,可以通过hasLeadership方法查看LeaderLatch实例是否leade。也可以在LeaderLatchListener里面监听当前节点是否是leader。使用LeaderLatch的时候如果不想参与选举了要调用close()方法退出选举。

LeaderLatch api介绍

public class LeaderLatch {

     * 构造函数
     * @param client    CuratorFramework
     * @param latchPath 路径,所有参与者同一个路径
    public LeaderLatch(CuratorFramework client, String latchPath);
    public LeaderLatch(CuratorFramework client, String latchPath, String id);
    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);

     * 参与选举
    public void start() throws Exception;

     * 退出选举
    public void close() throws IOException;

     * 退出选举
     * 关闭方式:SILENT : 静默关闭,不触发相关监听器、NOTIFY_LEADER :关闭时触发监听器
    public synchronized void close(CloseMode closeMode) throws IOException;

     * 添加监听器,监听是否当选为leader
    public void addListener(LeaderLatchListener listener);
    public void addListener(LeaderLatchListener listener, Executor executor);

     * 移除监听器
    public void removeListener(LeaderLatchListener listener);

     * 尝试让当前LeaderLatch实例为leader
    public void await() throws InterruptedException, EOFException
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

     * 获取构造函数里面这是的id
    public String getId();

     * 获取当前LeaderLatch实例的状态
    public State getState();

     * 返回所有的参与者
    public Collection<Participant> getParticipants() throws Exception;

     * 返回当前leader节点信息
    public Participant getLeader() throws Exception;

     * 判断实例是否是leader
    public boolean hasLeadership();



    public void leaderLatch() throws Exception {

        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
        List<LeaderLatch> leaderLatchList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 这里我们所有的客户端都参与leader选举
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 所有的客户端都参与leader选举
            final LeaderLatch latch = new LeaderLatch(zookeeperClientList.get(index), LEADER_PATH, index + "");
            latch.addListener(new LeaderLatchListener() {
                public void isLeader() {
                    System.out.println("我是leader: " + latch.getId());

                public void notLeader() {
                    System.out.println("我不是leader: " + latch.getId());

        // 30S之后
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        // 我们找到谁是leader
        String leaderId = leaderLatchList.get(0).getLeader().getId();
        System.out.println("当前leader id : " + leaderId);
        leaderLatchList.forEach(item -> {
            // 这里我们吧leader退出选举,让剩下的重新选举
            if (item.getId().equals(leaderId)) {
                try {
                } catch (IOException e) {

        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
        leaderLatchList.forEach(curatorFramework -> {
            // 退出选举
            try {
            } catch (IOException e) {
        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端


2.2.2 LeaderSelector


  • LeaderSelector:LeaderSelector选举实例类。
  • LeaderSelectorListener:监听选举状态和连接状态
  • LeaderSelectorListenerAdapter:实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类。
  • CancelLeadershipException: 抢主失败异常。

LeaderSelector api 介绍

public class LeaderSelector {

     * 构造函数
     * @param client     CuratorFramework
     * @param leaderPath 路径
     * @param listener   监听器
    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);

     * 保证在此实例释放领导权之后还可能获得领导权
    public void autoRequeue();

     * 设置获取当前实例对应的id
    public void setId(String id);
    public String getId();

     * 当前实例参与选举
    public void start();

     * 重新键入到参与者队列里面去选举,如果此实例已在参与者排队里面,则不会发生任何操作并返回false。如果实例未排队,则重新执行该操作并返回true
    public boolean requeue();

     * 退出选举
    public synchronized void close();

     * 获取所有的参与者
    public Collection<Participant> getParticipants() throws Exception;

     * 获取leader
    public Participant getLeader() throws Exception;

     * 当前节点是否是leader
    public boolean hasLeadership();

     * 如果当前实例是leader的话,尝试终断领导权
    public synchronized void interruptLeadership();



public interface ConnectionStateListener {
     * 监听网络连接问题
    public void stateChanged(CuratorFramework client, ConnectionState newState);

 * Notification for leadership
 * @see LeaderSelector
public interface LeaderSelectorListener extends ConnectionStateListener {
     * 当前节点获取到leader权之后调用,注意:在您希望释放领导力之前,此方法不应返回
     * 所以说如果你想一直占有leader权利,就在里面写个无限循环吧
    public void         takeLeadership(CuratorFramework client) throws Exception;


 * 实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
     * 当遇到SUSPENDED以及LOST时直接抛出CancelLeadershipException从而去引发LeaderSelector.interruptLeadership()调用
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ) {
            throw new CancelLeadershipException();

       我们还是用一个简单的实例来说明LeaderSelector的用法,我们还是创建10个zookeeper客户端。并且我们创建一个LeaderSelectorAdapter类,在里面当是leader之后的一些处理,如果是leader 10s之后,释放leader权力重新选举。

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter {

    private final LeaderSelector leaderSelector;

    public LeaderSelectorAdapter(CuratorFramework client, String path, String id) {
        // 创建一个LeaderSelector对象
        leaderSelector = new LeaderSelector(client, path, this);
        // 设置id
        // 保证在此实例释放领导权之后还可能获得领导权

     * 参与选举
    public void start() {
        // 参与选举

     * 退出选举
    public void close() {
        // 退出选举

     * 当获得leader的时候,这个方法会被调用。如果还想继续当leader,这个方法不能返回。如果你想要要此实例一直是leader的话可以加一个死循环
    public void takeLeadership(CuratorFramework client) throws Exception {
        System.out.println(leaderSelector.getId() + " 是leader");
        try {
            // 当上leader 5s之后,释放leader权利
        } catch (InterruptedException e) {
            System.err.println(leaderSelector.getId() + " 被中断.");
        } finally {
            System.out.println(leaderSelector.getId() + " 释放leader的权力。");
    private static final String LEADER_PATH = "/tuacy/leaderSelector";

    public void leaderSelector() throws Exception {
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
        List<LeaderSelectorAdapter> leaderLatchList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 这里我们所有的客户端都参与leader选举
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 所有的客户端都参与leader选举
            final LeaderSelectorAdapter latch = new LeaderSelectorAdapter(zookeeperClientList.get(index), LEADER_PATH, index + "");

        // 1分钟之后关掉程序
        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
        leaderLatchList.forEach(curatorFramework -> {
            // 退出选举

        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端

2.3 分布式锁


2.3.1 InterProcessMutex


InterProcessMutex api 介绍

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {

     * InterProcessMutex的构造函数,
    public InterProcessMutex(CuratorFramework client, String path);
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);

     * 申请获取锁
    public void acquire() throws Exception;
    public boolean acquire(long time, TimeUnit unit) throws Exception;

     * 如果此JVM中的线程获取了互斥锁,则返回true
    public boolean isAcquiredInThisProcess();

     * 释放锁
    public void release() throws Exception;

     * 返回所有参与获取锁的所有当前节点的排序列表
    public Collection<String> getParticipantNodes() throws Exception;

     * 将锁设为可撤销的. 当别的进程或线程想让你释放锁是Listener会被调用
    public void makeRevocable(RevocationListener<InterProcessMutex> listener);
    public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor);

     * 如果调用线程获取互斥锁,则返回true
    public boolean isOwnedByCurrentThread();


2.3.2 InterProcessSemaphoreMutex


InterProcessSemaphoreMutex api介绍

public class InterProcessSemaphoreMutex implements InterProcessLock {

     * 构造函数
    public InterProcessSemaphoreMutex(CuratorFramework client, String path);

     * 申请获取锁
    public void acquire() throws Exception;

    public boolean acquire(long time, TimeUnit unit) throws Exception;

     * 释放锁
    public void release() throws Exception;

     * 如果此JVM中的线程获取了互斥锁,则返回true
    public boolean isAcquiredInThisProcess();

2.3.3 InterProcessReadWriteLock

       InterProcessReadWriteLock 读写锁。和ReadWriteLock类似。

InterProcessReadWriteLock api 介绍

public class InterProcessReadWriteLock {

     * 读锁
    private final InterProcessMutex readMutex;
     * 写锁
    private final InterProcessMutex writeMutex;
     * 构造函数
    public InterProcessReadWriteLock(CuratorFramework client, String basePath)

     * 构造函数
     * lockData是存储在节点上的数据
    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData);

     * 获取读锁
    public InterProcessMutex     readLock();

     * 获取写锁
    public InterProcessMutex     writeLock();


2.3.4 信号量(InterProcessSemaphoreV2)



  • InterProcessSemaphoreV2:信号量实现类
  • Lease:租约(单个信号)
  • SharedCountReader:计数器,用于计算最大租约数量

InterProcessSemaphoreV2 api 介绍

public class InterProcessSemaphoreV2 {

     * 构造函数
     * @param client    CuratorFramework
     * @param path      节点路径
     * @param maxLeases 允许此实例的最大租约数
    public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);

     * 构造函数
     * @param client CuratorFramework
     * @param path   节点路径
     * @param count  用于最大租约的共享计数
    public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);

     * 此信号量创建的节点放置的数据,必须在调用其中一个acquire()方法之前调用它
    public void setNodeData(byte[] nodeData);

     * 返回参与信号量的所有当前节点的列表
    public Collection<String> getParticipantNodes() throws Exception;

     * 关闭给定租约集合中的所有租约
    public void returnAll(Collection<Lease> leases);

     * 关闭租约
    public void returnLease(Lease lease);

     * 获取租约,如果没有租约获取会一直阻塞直到获取到租约
    public Lease acquire() throws Exception;
    public Lease acquire(long time, TimeUnit unit) throws Exception

     * 获取指定数量的租约,如果没有获取到制定数量租约会一直阻塞
    public Collection<Lease> acquire(int qty) throws Exception;
    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;

2.3.5 InterProcessMultiLock(多共享锁对象


InterProcessMultiLock api 介绍

public class InterProcessMultiLock implements InterProcessLock {

     * 构造函数
     * @param client CuratorFramework
     * @param paths 节点列表对应的路径(多个路径就是多个锁)
    public InterProcessMultiLock(CuratorFramework client, List<String> paths);

     * 构造函数
    public InterProcessMultiLock(List<InterProcessLock> locks);

     * 请求锁
    public void acquire() throws Exception;
    public boolean acquire(long time, TimeUnit unit) throws Exception;

     * 释放锁
    public synchronized void release() throws Exception;

     * 如果此JVM中的线程获取了所有的锁,则返回true
    public synchronized boolean isAcquiredInThisProcess();

2.4 分布式计数器


2.4.1 SharedCount(int计数器)


  • SharedCount:计数器的具体实现。
  • SharedCountListener:监听数据的改变。

SharedCount api 介绍

public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener> {

     * 构造函数
     * @param client CuratorFramework
     * @param path 计数器依赖的节点
     * @param seedValue 如果当前节点对应的计数器没有值,就会用该值
    public SharedCount(CuratorFramework client, String path, int seedValue);
    protected SharedCount(CuratorFramework client, String path, SharedValue sv);

     * 获取当前计数
    public int getCount();

     * 获取当前节点对应的版本信息
    public VersionedValue<Integer> getVersionedValue();

     * 设置计数器的值
    public void     setCount(int newCount) throws Exception;

     * 设置计数器的值,这里要注意如果当前版本的值在这个时刻有改变则设置不成功。CAS操作
    public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;

     * 添加监听器
    public void     addListener(SharedCountListener listener);
    public void     addListener(final SharedCountListener listener, Executor executor);

     * 移除监听器
    public void     removeListener(SharedCountListener listener);

     * 启动
    public void     start() throws Exception;

     * 结束
    public void close() throws IOException;


public class SharedCountTest {

    private static final String PATH_COUNTER = "/int/counter";

    class CounterThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final SharedCount counter;

        CounterThread(SharedCount counter, int index, CountDownLatch countDownLatch) {
            this.counter = counter;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    while (true) {
                        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                        boolean success = counter.trySetCount(counter.getVersionedValue(), counter.getCount() + 1);
                        if (success) {
            } catch (Exception e) {
            } finally {
                try {
                    System.out.println("当前值为:" + counter.getCount());
                } catch (Exception e) {


    public void sharedCount() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 如果节点存在,我们就删除节点

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            SharedCount sharedCount = new SharedCount(zookeeperClientList.get(index), PATH_COUNTER, 0);
            sharedCount.addListener(new SharedCountListener() {
                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                    System.out.println("计数器值改变,现在的值为:" + newCount);

                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    // 连接状态改变
            new CounterThread(sharedCount, index, countDownLatch).start();

        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端


2.4.2 DistributedAtomicLong(long计数器)


DistributedAtomicLong api 介绍

public class DistributedAtomicLong implements DistributedAtomicNumber<Long> {

     * 构造函数(乐观锁模式)
     * @param client CuratorFramework
     * @param counterPath 节点路径
     * @param retryPolicy 重试策略 -- 乐观加锁
    public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy);

     * 构造函数,retryPolicy(乐观加锁)还没成功,则进行promotedToLock的方式以互斥的方式加锁重试 (排他锁模式)
     * @param client CuratorFramework
     * @param counterPath 节点路径
     * @param retryPolicy 重试策略 -- 乐观加锁
     * @param promotedToLock 排他锁策略
    public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy, PromotedToLock promotedToLock);

     * 获取当前值
    public AtomicValue<Long>     get() throws Exception

     * 强制设置计数值
    public void forceSet(Long newValue) throws Exception;

     * CAS更新(乐观锁模式更新)
    public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception;

     * 设置值
    public AtomicValue<Long>   trySet(Long newValue) throws Exception;

     * 如果之前没有初始值,则把初始值设置进去
    public boolean initialize(Long initialize) throws Exception;

     * +1
    public AtomicValue<Long>    increment() throws Exception;

     * -1
    public AtomicValue<Long>    decrement() throws Exception;

     * 加一个指定的值
    public AtomicValue<Long>    add(Long delta) throws Exception;

     * 键一个指定的值
    public AtomicValue<Long> subtract(Long delta) throws Exception;



public class DistributedAtomicLongTest {

    private static final String PATH_COUNTER = "/long/counter";

    class CounterThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedAtomicLong counter;

        CounterThread(DistributedAtomicLong counter, int index, CountDownLatch countDownLatch) {
            this.counter = counter;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    // 保证成功
                    while (true) {
                        AtomicValue<Long> value = counter.increment();
                        if (value.succeeded()) {
                            System.out.println("succeed: " + value.succeeded() + " value:" + value.postValue());
                        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);

            } catch (Exception e) {
            } finally {


    public void distributedAtomicLong() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 如果节点存在,我们就删除节点
        if (zookeeperClientList.get(0).checkExists().forPath(PATH_COUNTER) != null) {

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 乐观锁模式
            DistributedAtomicLong count = new DistributedAtomicLong(zookeeperClientList.get(index), PATH_COUNTER, new RetryNTimes(10, 10));
            boolean initializeSuccess = count.initialize(0L);
            if (initializeSuccess) {
            } else {
            new CounterThread(count, index, countDownLatch).start();

        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端


2.5 分布式队列

2.5.4 简单队列 - SimpleDistributedQueue



public class SimpleDistributedQueueTest {

    private static final String SIMPLE_DISTRIBUTED_QUEUE_PATH = "/SimpleDistributedQueue";

    class QueueActionThread extends Thread {

        private final SimpleDistributedQueue queue;
        private final CountDownLatch countDownLatch;
        private final int queueIndex;

        QueueActionThread(SimpleDistributedQueue queue, int index, CountDownLatch countDownLatch) {
            this.queue = queue;
            this.queueIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    String message = "我是队列:" + queueIndex + " 的第-" + index + "-条消息";
                Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                for (int index = 0; index < 5; index++) {
                    byte[] queueItem = queue.take();
                    System.out.println("我是队列:" + queueIndex + " 我收到了:" + new String(queueItem));
            } catch (Exception e) {
            } finally {


    public void simpleDistributedQueue() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            SimpleDistributedQueue queue = new SimpleDistributedQueue(zookeeperClientList.get(index), SIMPLE_DISTRIBUTED_QUEUE_PATH);
            new QueueActionThread(queue, index, countDownLatch).start();

        // 关闭客户端


2.5.2 普通队列 - DistributedQueue



public class DistributedQueueTest {

    private static final String DISTRIBUTED_QUEUE_PATH = "/queue/distributedQueue";

    class QueueActionThread extends Thread {

        private final DistributedQueue<String> queue;
        private final CountDownLatch countDownLatch;
        private final int queueIndex;

        QueueActionThread(DistributedQueue<String> queue, int index, CountDownLatch countDownLatch) {
            this.queue = queue;
            this.queueIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    queue.put("队列 " + queueIndex + " 发来的消息:" + index);
                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
            } catch (Exception e) {
            } finally {


    public void distributedQueue() throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            QueueBuilder<String> queueBuild = QueueBuilder.builder(zookeeperClientList.get(index), index % 2 == 0 ? new ConsumerImp(index + "") : null, createQueueSerializer(), DISTRIBUTED_QUEUE_PATH);
            DistributedQueue<String> queue = queueBuild.buildQueue();
            new QueueActionThread(queue, index, countDownLatch).start();

        // 关闭客户端


     * 队列消息序列化实现类
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            public byte[] serialize(String item) {
                return item.getBytes();

            public String deserialize(byte[] bytes) {
                return new String(bytes);

    private class ConsumerImp implements QueueConsumer<String>{

        private String consumerName;

        public ConsumerImp(String consumerName) {
            this.consumerName = consumerName;

        public void consumeMessage(String message) throws Exception {
            System.out.println(consumerName + " 收到消息: " + message);

        public void stateChanged(CuratorFramework client, ConnectionState newState) {



2.5.3 带id的队列 - DistributedIdQueue


2.5.4 优先级队列 - DistributedPriorityQueue


2.5.4 延迟队列 - DistributedDelayQueue


2.6 分布式屏障 - Barrier

       分布式Barrier是这样一个功能:它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行。

2.6.1 DistributedBarrier


DistributedBarrier api 介绍。

public class DistributedBarrier {

     * @param client CuratorFramework
     * @param barrierPath barrier路径节点
    public DistributedBarrier(CuratorFramework client, String barrierPath);

     * 设置栅栏,它将阻塞在它上面等待的线程:
    public synchronized void         setBarrier() throws Exception;

     * 设置栅栏
    public synchronized void      removeBarrier() throws Exception;

     * 等待放行条件
    public synchronized void      waitOnBarrier() throws Exception
    public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception;


public class DistributedBarrierTest {

    private static final String BARRIER_PATH_COUNTER = "/barrier";

    class LogicThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedBarrier barrier;

        LogicThread(DistributedBarrier barrier, int index, CountDownLatch countDownLatch) {
            this.barrier = barrier;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                System.out.println("线程: " + threadIndex + " 请求进入");
                // 阻塞等待
                System.out.println("线程: " + threadIndex + " 成功进入");
            } catch (Exception e) {
            } finally {


    public void distributedBarrier() throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 如果节点存在,我们就删除节点
        if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {
        DistributedBarrier controlBarrier = new DistributedBarrier(zookeeperClientList.get(0), BARRIER_PATH_COUNTER);
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            DistributedBarrier barrier = new DistributedBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER);
            new LogicThread(barrier, index, countDownLatch).start();

        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端



2.6.2 DistributedDoubleBarrier

       DistributedDoubleBarrier:类似CyclicBarrier 。允许多个分布式线程等待,等线程个数达到了指定数量的时候,就可以同时执行或者同时退出了。

DistributedDoubleBarrier api 的使用

public class DistributedDoubleBarrier {

     * 构造函数,
     * memberQty是成员数量,当enter()方法被调用时,成员被阻塞,直到所有的成员都调用了enter()
     * 当leave()方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave()
    public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);

     * 进入栅栏并且阻塞,直到所有的成员都进入
    public void     enter() throws Exception;
    public boolean     enter(long maxWait, TimeUnit unit) throws Exception;

     * 退出栅栏并且阻塞,知道所有的成员都退出
    public synchronized void     leave() throws Exception;
    public synchronized boolean     leave(long maxWait, TimeUnit unit) throws Exception;


public class DistributedDoubleBarrierTest {

    private static final String BARRIER_PATH_COUNTER = "/barrier";

    class LogicThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedDoubleBarrier barrier;

        LogicThread(DistributedDoubleBarrier barrier, int index, CountDownLatch countDownLatch) {
            this.barrier = barrier;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;

        public void run() {
            try {
                Uninterruptibles.sleepUninterruptibly(5 * threadIndex, TimeUnit.SECONDS);
                System.out.println("线程:" + threadIndex + " 请求进入");
                System.out.println("线程:" + threadIndex + " 成功进入");

                System.out.println("线程:" + threadIndex + " 请求离开");
                System.out.println("线程:" + threadIndex + " 成功离开");
            } catch (Exception e) {
            } finally {


    public void distributedDoubleBarrier() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 启动10个zookeeper客户端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
            // 启动客户端

        // 如果节点存在,我们就删除节点
        if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER, 5);
            new LogicThread(barrier, index, countDownLatch).start();

        zookeeperClientList.forEach(curatorFramework -> {
            // 关闭客户端


三 Spring Boot使用Curator

       Spring Boot中使用Curator,我们要想办法创建一个zookeeper客户端.然后把这个客户端对象添加到Spring容器中去.这样我们就可以在各个地方拿到这个zookeeper客户端对象.


public class ZkClient {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

     * zookeeper客户端实例
    private CuratorFramework client;
     * 服务器列表,格式host1:port1,host2:port2,...
    private String zookeeperServer;
     * 会话超时时间,单位毫秒,默认60000ms
    private int sessionTimeoutMs;
     * 连接创建超时时间,单位毫秒,默认60000ms
    private int connectionTimeoutMs;
     * 重试之间等待的初始时间
    private int baseSleepTimeMs;
     * 当连接异常时的重试次数
    private int maxRetries;
     * 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
    private String namespace;

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;

    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;

    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;

    public void setNamespace(String namespace) {
        this.namespace = namespace;

     * spring 自动调用,不需要我们主动调用
    public void init() {
        // 创建客户端
        // 重连规则
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
        // 启动客户端,连接服务器

     * spring 自动调用,不需要我们主动调用
    public void stop() {
        // 关闭客户端

     * 获取 zookeeper 客户端对象
     * @return CuratorFramework
    public CuratorFramework getClient() {
        return client;



public class ZkConfiguration {

     * 服务器列表,格式host1:port1,host2:port2,...
    private String zookeeperServer;
     * 会话超时时间,单位毫秒,默认60000ms
    private int sessionTimeoutMs;
     * 连接创建超时时间,单位毫秒,默认60000ms
    private int connectionTimeoutMs;
     * 当连接异常时的重试次数
    private int maxRetries;
     * 重试之间等待的初始时间
    private int baseSleepTimeMs;
     * 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
    private String namespace;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClient zkClient() {
        ZkClient zkClient = new ZkClient();
        return zkClient;



# zeekeeper配置
  server: # 服务器列表,格式host1:port1,host2:port2,...
  sessionTimeoutMs: 6000 # 会话超时时间,单位毫秒,默认60000ms
  connectionTimeoutMs: 6000 # 连接创建超时时间,单位毫秒,默认60000ms
  maxRetries: 3 # 当连接异常时的重试次数
  baseSleepTimeMs: 1000 # 重试之间等待的初始时间
  namespace: lock # 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间,不需要的时候可以去掉


       到此关于java zookeeper客户端Curator的使用部分就讲完了.文章中设计到的所有实例代码在 https://github.com/tuacy/java-study工程目录的zookeeper文件下面可以找到.



