美文网首页
ZooKeeper客户端Curator使用

ZooKeeper客户端Curator使用

作者: tuacy | 来源:发表于2019-08-18 22:32 被阅读0次

           Curator是Netflix公司开源的一套zookeeper客户端框架.解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Curator被看做是zookeeper客户端框里面的瑞士军刀(牛逼了)。Curator使得我们开发zookeeper客户端程序变的很容易。

           Curator框架包含三个主要的包:

    • curator-framework:对zookeeper的底层api的一些封装。
    • curator-client:提供一些客户端的操作,例如重试策略等。
    • curator-recipes:封装了一些高级特性,例如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

           Curator的引入(pom方式,版本可能有变化)。

            <!-- zookeeper -->
            <!-- 对zookeeper的底层api的一些封装 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.1</version>
            </dependency>
            <!-- 提供一些客户端的操作,例如重试策略等 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.10</version>
                <!--排除这个slf4j-log4j12-->
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <artifactId>log4j</artifactId>
                        <groupId>log4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    一 Curator的基本用法

    1.1 创建zookeeper客户端

           在Curator中CuratorFramework对象就代表一个zookeeper客户端。所以创建创建zookeeper客户端就是创建CuratorFramework对象。CuratorFramework对象又可以通过CuratorFrameworkFactory来创建。

    CuratorFramework api介绍如下

    public interface CuratorFramework {
    
        /**
         * 启动zookeeper客户端
         */
        public void start();
    
        /**
         * 关闭zookeeper客户端
         */
        public void close();
    
        /**
         * 返回客户端状态:LATENT、STARTED、STOPPED
         */
        public CuratorFrameworkState getState();
    
        /**
         * 客户端是否已经启动
         */
        @Deprecated
        public boolean isStarted();
    
        /**
         * 创建节点的建造器
         */
        public CreateBuilder create();
    
        /**
         * 删除节点的建造器
         */
        public DeleteBuilder delete();
    
        /**
         * 检查节点是否存在的建造器
         */
        public ExistsBuilder checkExists();
    
        /**
         * 获取接连数据的建造器
         */
        public GetDataBuilder getData();
    
        /**
         * 设置节点数据的建造器
         */
        public SetDataBuilder setData();
    
        /**
         * 获取子节点的建造器
         */
        public GetChildrenBuilder getChildren();
    
        /**
         * 获取权限的构造器
         */
        public GetACLBuilder getACL();
    
        /**
         * 设置权限的构造器
         */
        public SetACLBuilder setACL();
    
        /**
         * 重新配置的建造器
         */
        public ReconfigBuilder reconfig();
    
        /**
         * 获取配置的建造器
         */
        public GetConfigBuilder getConfig();
    
        /**
         * 事务构造器
         * @deprecated use {@link #transaction()} instead
         */
        public CuratorTransaction inTransaction();
    
        /**
         * 事务构造器
         */
        public CuratorMultiTransaction transaction();
    
        /**
         * 分配可与{transaction()}一起使用的操作
         */
        public TransactionOp transactionOp();
    
    
        /**
         * 如果路径不存在,则创建路径对应的节点
         */
        public void createContainers(String path) throws Exception;
    
        /**
         * 启动同步构建器。注意:即使您不使用其中一种background()方法,同步也始终在后台
         */
        public SyncBuilder sync();
    
        /**
         * 启动remove watch builder,有节点删除的时候会调用
         */
        public RemoveWatchesBuilder watches();
    
        /**
         *
         * 返回Connect State的可侦听接口
         */
        public Listenable<ConnectionStateListener> getConnectionStateListenable();
    
        /**
         * 返回事件的可侦听接口
         */
        public Listenable<CuratorListener> getCuratorListenable();
    
        /**
         * 返回未处理错误的可侦听接口
         */
        public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();
    
        /**
         * 返回一个新的CuratorFramework,该CuratorFramework指定了一个新的命名空间
         */
        public CuratorFramework usingNamespace(String newNamespace);
    
        /**
         * 获取命名空间
         */
        public String getNamespace();
    
        /**
         * 返回托管的zookeeper客户端
         */
        public CuratorZookeeperClient getZookeeperClient();
    
        /**
         * 阻塞,直到与ZooKeeper的连接可用或已超过maxWaitTime
         */
        public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
    
        /**
         * 阻塞,直到与ZooKeeper的连接可用。在连接可用或中断之前,此方法不会返回,在这种情况下,将抛出InterruptedException
         */
        public void blockUntilConnected() throws InterruptedException;
    
        /**
         * 返回跟踪观察者创建的当前实例的外观,并允许一次性删除所有观察者
         */
        public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
    
        /**
         * 返回配置的错误策略
         */
        public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();
    
        /**
         *
         * Current维护Zookeeper仲裁配置的缓存视图。
         */
        public QuorumVerifier getCurrentConfig();
    
        /**
         * 获取SchemaSet
         */
        SchemaSet getSchemaSet();
    
        /**
         * 如果此实例在ZK 3.4.x兼容模式下运行,则返回true
         */
        boolean isZk34CompatibilityMode();
    
    }
    

    CuratorFrameworkFactory api介绍如下

    public class CuratorFrameworkFactory {
    
        /**
         * 用于通过建造者模式创建zookeeper客户端
         */
        public static Builder builder();
    
        /**
         * 创建zookeeper客户端
         */
        public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);
    
        /**
         * 创建zookeeper客户端
         */
        public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
    
        /**
         * 将本地地址作为可用作节点有效负载的字节返回
         */
        public static byte[] getLocalAddress();
    
        public static class Builder {
    
            /**
             * build CuratorFramework对象 -- zookeeper客户端
             */
            public CuratorFramework build();
    
            /**
             * 创建一个临时的CuratorFramework客户端,CuratorFramework,默认3分钟不活动客户端连接就被关闭
             */
            public CuratorTempFramework buildTemp();
    
            /**
             * 创建一个临时的CuratorFramework客户端,CuratorFramework,可以自己指定多长时间不活动客户端连接就被关闭
             */
            public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit);
    
            /**
             * 添加zookeeper 访问权限
             */
            public Builder authorization(String scheme, byte[] auth);
    
            public Builder authorization(List<AuthInfo> authInfos);
    
            /**
             * 设置zookeeper服务器列表
             */
            public Builder connectString(String connectString);
    
            /**
             * zookeeper服务器地址通过EnsembleProvider(配置提供者)来提供,不能和connectString共同使用
             */
            public Builder ensembleProvider(EnsembleProvider ensembleProvider);
    
            /**
             * 为每次新建的节点设置一个默认值
             */
            public Builder defaultData(byte[] defaultData);
    
            /**
             * 设置命名空间,为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
             */
            public Builder namespace(String namespace)
    
            /**
             * 会话超时时间,单位毫秒,默认60000ms
             */
            public Builder sessionTimeoutMs(int sessionTimeoutMs);
    
            /**
             * 连接创建超时时间,单位毫秒,默认60000ms
             */
            public Builder connectionTimeoutMs(int connectionTimeoutMs);
    
            /**
             * @param maxCloseWaitMs time to wait during close to join background threads
             * @return this
             */
            public Builder maxCloseWaitMs(int maxCloseWaitMs);
    
            /**
             * 设置客户端重连策略
             */
            public Builder retryPolicy(RetryPolicy retryPolicy);
    
            /**
             * Executor Services的线程工厂
             */
            public Builder threadFactory(ThreadFactory threadFactory);
    
            /**
             * 压缩器,用于压缩和解压数据
             */
            public Builder compressionProvider(CompressionProvider compressionProvider);
    
            /**
             * ZookeeperFactory 用于创建ZooKeeper
             */
            public Builder zookeeperFactory(ZookeeperFactory zookeeperFactory);
    
            /**
             * 权限控制器
             */
            public Builder aclProvider(ACLProvider aclProvider);
    
            /**
             * 设置只读模式
             */
            public Builder canBeReadOnly(boolean canBeReadOnly);
    
            /**
             * 不让客户端,创建节点的时候顺带创建父节点
             */
            public Builder dontUseContainerParents();
    
            /**
             * 默认是StandardConnectionStateErrorPolicy,设置要使用的错误策略
             */
            public Builder connectionStateErrorPolicy(ConnectionStateErrorPolicy connectionStateErrorPolicy);
    
            /**
             * 如果mode为true,则创建ZooKeeper 3.4.x兼容客户端。如果使用的客户端库是ZooKeeper 3.4.x 默认情况下已启用
             */
            public Builder zk34CompatibilityMode(boolean mode);
    
            /**
             * 更改连接处理策略,默认StandardConnectionHandlingPolicy
             */
            public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy);
    
            /**
             * 添加强制架构集
             */
            public Builder schemaSet(SchemaSet schemaSet);
        }
    }
    

           从上面的CuratorFrameworkFactory api的介绍可以看出CuratorFrameworkFactory对象的创建有两种方式:

    • 通过过构造函数创建
    参数 类型 含义
    connectString String 服务器列表,格式host1:port1,host2:port2,…
    sessionTimeoutMs int 会话超时时间,单位毫秒,默认60000ms
    connectionTimeoutMs int 连接创建超时时间,单位毫秒,默认60000ms
    retryPolicy RetryPolicy 重试策略,curator已经提供了多种重试策略,也可以自行实现RetryPolicy接口

    curator提供的重试策略有:ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed

    • 通过build创建,关于build里面的各个参数在CuratorFrameworkFactory api里面都顺带介绍了哦。

           比如如下的实例代码,连接到127.0.0.1:2181服务端。

    关于zookeeper的安装大家可以自己去网上搜下。

            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
    

    1.2 启动客户端

           调用start()方法启动客户端。这个时候zookeeper客户端才会去连接zookeeper服务端。在zookeeper客户端上做的所有动作都需要在start()之后执行。如果你不想连接服务端的时候可以调用close()方法断开连接.

    1.3 节点操作

           首先我们要明确zookeeper里面的节点结构类似于我们文件系统的结构(就像一棵树样的)。除此之外zookeeper的每个节点上还可以保存数据。zookeeper里面的节点有四种,不同的节点类型都有自己的特点:

    • CreateMode.PERSISTENT:持久化节点。
    • CreateMode.PERSISTENT_SEQUENTIAL:持久化并且带序列号节点。
    • CreateMode.EPHEMERAL:临时节点(客户端断开了节点也就删除了)
    • CreateMode.EPHEMERAL_SEQUENTIAL:临时并且带序列号(客户端断开了节点也就删除了)

    1.3.1 创建节点

           创建节点很简单,我们前面已经创建了zookeeper客户端,并且调了start()方法把客户端启动起来了。

           比如我们可以用如下的代码创建一个持久化的节点。通过withMode(CreateMode.PERSISTENT)来指定节点的类型。

        /**
         * 同步 创建持久化节点
         *
         * @param path 节点路径
         * @throws Exception errors
         */
        public void createPersistentNodeSync(String path) throws Exception {
            client.create()
                    .creatingParentContainersIfNeeded() // 自动递归创建父节点
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path);
        }
    

           我们也可以在创建节点的同时,给节点设置数据。

    
        /**
         * 同步-创建持久化节点
         *
         * @param path 节点路径
         * @param data 节点对应的值
         * @throws Exception errors
         */
        public void createPersistentNodeSync(String path, byte[] data) throws Exception {
            client.create()
                    .creatingParentContainersIfNeeded() // 自动递归创建父节点
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, data);
    
        }
    

    1.3.2 删除节点

           删除叶子节点。(如果不是叶子节点是会报异常的)

        /**
         * 同步-删除一个叶子节点(注意哦,只能删除叶子节点否则报错的)
         *
         * @param path 需要删除的节点对应的路径
         * @throws Exception errors
         */
        public void deleteNodeSync(String path) throws Exception {
            client.delete()
                    .forPath(path);
        }
    

           我们也可以删除整个节点(包括节点下的子节点)。

        /**
         * 同步-删除一个节点,并且递归删除其所有的子节点
         *
         * @param path 需要删除的节点对应的路基
         * @throws Exception errors
         */
        public void deleteNodeRecursivelySync(String path) throws Exception {
            client.delete()
                    .deletingChildrenIfNeeded()
                    .forPath(path);
    
        }
    

    1.3.3 判断节点是否存在

           通过节点的Stat来判断节点是否存在。

        /**
         * 同步-检查节点是否存在
         *
         * @param path 节点路径
         * @return 节点是否存在
         * @throws Exception errors
         */
        public boolean isNodeExistSync(String path) throws Exception {
            Stat state = client.checkExists()
                    .forPath(path);
            return state != null;
        }
    

    1.3.4 节点数据操作

            读取节点数据。

        /**
         * 同步-读取一个节点的数据内容
         *
         * @param path 节点路基
         * @return 节点内容
         * @throws Exception errors
         */
        public byte[] getNodeDataSync(String path) throws Exception {
            return client.getData()
                    .forPath(path);
        }
    

            更新节点数据,或者设置null删除节点数据

        /**
         * 同步-更新一个节点的数据内容
         *
         * @param path 节点路径
         * @param data 节点对应数据
         * @throws Exception errors
         */
        public void updateNodeDataSync(String path, byte[] data) throws Exception {
            client.setData()
                    .forPath(path, data);
        }
    

    1.3.5 获取节点的所有子节点

        /**
         * 同步-获取某个节点的所有子节点路径
         *
         * @param path 目录
         * @return children
         * @throws Exception errors
         */
        public List<String> getChildrenSync(String path) throws Exception {
            return client.getChildren()
                    .forPath(path);
        }
    

    1.4 事务

           事务相信大家都非常的熟悉。Curator也提供了事务的支持,一组crud操作要么都成功,要么都失败。使用起来也非常的简单。

           一个事务里面肯定是有多个操作的,我们首先要把每个操作都封装成CuratorOp。比如如下的实例,我们把多个操作放到一个事务里面去执行.

        @Test
        public void transaction() throws Exception {
            CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
            CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
            CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");
    
            Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
            for (CuratorTransactionResult result : results) {
                System.out.println(result.getForPath() + " - " + result.getType());
            }
        }
    

    1.5 异步操作

           因为zookeeper客户端的操作都是在和zookeeper服务端打交道的。涉及到网络的调用。所以有些操作的响应就不会那么及时了。Curator就给提供了异步操作。异步响应操作结果。

           既然是异步操作,那么肯定需要BackgroundCallback来异步接收操作结果了。关于异步操作,我们也举一个简单的例子,我们以创建节点来举例(删除节点,修改节点数据,事务等等其他操作都是一样的使用)。

        /**
         * 异步-获取某个节点的所有子节点路径
         *
         * @param path     目录
         * @param callback 回调
         * @throws Exception errors
         */
        public void getChildrenAsync(String path, BackgroundCallback callback) throws Exception {
            client.getChildren()
                    .inBackground(callback)
                    .forPath(path);
        }
    
        /**
         * 异步-获取某个节点的所有子节点路径
         *
         * @param path     目录
         * @param callback 回调
         * @param executor 回调在哪里执行
         * @throws Exception errors
         */
        public void getChildrenAsync(String path, BackgroundCallback callback, Executor executor) throws Exception {
            client.getChildren()
                    .inBackground(callback, executor)
                    .forPath(path);
    
        }
    

    二 Curator高级特性

           Curator里面的curator-recipes ja包封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等等。而且这些特性都是在分布式系统里面常用的功能了。

    2.1 Cache事件监听

           Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

    2.1.1 Path Cache

           Path Cache用来监控子节点.当一个子节点增加, 更新,删除时, Path Cache会改变它的状态,会包含最新子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

           Path Cache的使用非常的简单,主要涉及到四个类:

    • PathChildrenCache:Path Cache听实现类
    • PathChildrenCacheEvent:子节点事件
    • PathChildrenCacheListener: 子节点监听
    • ChildData:子节点信息

           关于Path Cache的使用,我们用一个实例来简单的说明下,实例里面也只是简单的创建了一个节点。最终监听到节点的创建.

        @Test
        public void pathChildrenCache() throws Exception {
    
            // 创建zookeeper客户端
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 启动客户端
            client.start();
    
            PathChildrenCache cache = new PathChildrenCache(client, "/tuacy/pathCache", true);
            // 添加监听
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    System.out.println("事件类型:" + event.getType());
                    if (null != event.getData()) {
                        System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
                    }
                }
            });
            cache.start();
            // 添加节点
            client.create().creatingParentContainersIfNeeded().forPath("/tuacy/pathCache/001");
            Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
            cache.close();
        }
    

    2.1.2 Node Cache

           Node Cache与Path Cache类似,Node Cache只是监听某一指定的节点。子节点的变化它是不会管的。

           Node Cache的使用涉及到下面的三个类:

    • NodeCache - Node Cache实现类
    • NodeCacheListener - 节点监听器
    • ChildData - 节点数据

           我们还是用一个简单的实例来说明。

        @Test
        public void nodeCache() throws Exception {
    
            // 创建zookeeper客户端
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 启动客户端
            client.start();
    
            final NodeCache cache = new NodeCache(client, "/tuacy/nodeCache");
            cache.start();
            // 添加监听
            cache.getListenable().addListener(new NodeCacheListener() {
    
                @Override
                public void nodeChanged() throws Exception {
                    ChildData data = cache.getCurrentData();
                    if (null != data) {
                        System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
                    } else {
                        System.out.println("节点被删除!");
                    }
                }
            });
            // 添加节点
            client.create().creatingParentsIfNeeded().forPath("/tuacy/nodeCache");
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            client.setData().forPath("/tuacy/nodeCache", "abc".getBytes());
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            client.delete().forPath("/tuacy/nodeCache");
            Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
            cache.close();
        }
    

    2.1.3 Tree Cache

           Tree Cache可以监控整个树上的所有节点,就是PathCache和NodeCache的组合功能。

           Tree Cache的使用涉及到下面四个类。

    • TreeCache - Tree Cache实现类
    • TreeCacheListener - 监听器类
    • TreeCacheEvent - 触发的事件类
    • ChildData - 节点数据

           我们还是以具体的实例来说明Tree Cache的使用。

        @Test
        public void nodeCache() throws Exception {
    
            // 创建zookeeper客户端
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 启动客户端
            client.start();
            final TreeCache cache = TreeCache.newBuilder(client, "/tuacy/treeCache")
                    .setCacheData(true)
                    .build();
            // 添加监听
            cache.getListenable().addListener(new TreeCacheListener() {
    
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    System.out.println("事件类型:" + event.getType() + " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
                }
            });
            cache.start();
            // 添加节点
            client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache");
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            // 给节点设置数据
            client.setData().forPath("/tuacy/treeCache", "abc".getBytes());
            // 创建子节点
            client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache/001");
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            // 修改子节点的数据
            client.setData().forPath("/tuacy/treeCache/001", "abc".getBytes());
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            // 删除子节点
            client.delete().forPath("/tuacy/treeCache/001");
            Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
            // 删除节点
            client.delete().forPath("/tuacy/treeCache");
            Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
            cache.close();
            client.close();
        }
    

    2.2 Leader选举

           在分布式系统中,选主是一个很常见的场景(Leader,Slaver真的真的是非常的常见)。

    • 主节点是唯一的。
    • 各个节点获取主节点的概率是一样的,一旦某个节点被选为了主节点(Leader),其他的从节点(Slaver)也要能感知到。
    • 一旦主节点断开,其他的从节点重新选出一个主节点。

    2.2.1 LeaderLatch

           在不同的zookeeper客户端,使用了相同latch path的LeaderLatch,当中的一个最终会被选举为leader,可以通过hasLeadership方法查看LeaderLatch实例是否leade。也可以在LeaderLatchListener里面监听当前节点是否是leader。使用LeaderLatch的时候如果不想参与选举了要调用close()方法退出选举。

    LeaderLatch api介绍

    public class LeaderLatch {
    
        /**
         * 构造函数
         *
         * @param client    CuratorFramework
         * @param latchPath 路径,所有参与者同一个路径
         */
        public LeaderLatch(CuratorFramework client, String latchPath);
        public LeaderLatch(CuratorFramework client, String latchPath, String id);
        public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);
    
        /**
         * 参与选举
         */
        public void start() throws Exception;
    
        /**
         * 退出选举
         */
        @Override
        public void close() throws IOException;
    
        /**
         * 退出选举
         * 关闭方式:SILENT : 静默关闭,不触发相关监听器、NOTIFY_LEADER :关闭时触发监听器
         */
        public synchronized void close(CloseMode closeMode) throws IOException;
    
    
        /**
         * 添加监听器,监听是否当选为leader
         */
        public void addListener(LeaderLatchListener listener);
        public void addListener(LeaderLatchListener listener, Executor executor);
    
        /**
         * 移除监听器
         */
        public void removeListener(LeaderLatchListener listener);
    
        /**
         * 尝试让当前LeaderLatch实例为leader
         */
        public void await() throws InterruptedException, EOFException
        public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    
        /**
         * 获取构造函数里面这是的id
         */
        public String getId();
    
        /**
         * 获取当前LeaderLatch实例的状态
         */
        public State getState();
    
        /**
         * 返回所有的参与者
         */
        public Collection<Participant> getParticipants() throws Exception;
    
        /**
         * 返回当前leader节点信息
         */
        public Participant getLeader() throws Exception;
    
        /**
         * 判断实例是否是leader
         */
        public boolean hasLeadership();
    
    }
    

           我们用一个简单的实例来说明LeaderLatch用法,比如我们创建10个zookeeper客户端来进行选举。

    
        @Test
        public void leaderLatch() throws Exception {
    
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
            List<LeaderLatch> leaderLatchList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 这里我们所有的客户端都参与leader选举
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                // 所有的客户端都参与leader选举
                final LeaderLatch latch = new LeaderLatch(zookeeperClientList.get(index), LEADER_PATH, index + "");
                latch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println("我是leader: " + latch.getId());
                    }
    
                    @Override
                    public void notLeader() {
                        System.out.println("我不是leader: " + latch.getId());
                    }
                });
                latch.start();
                leaderLatchList.add(latch);
            }
    
            // 30S之后
            Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
            // 我们找到谁是leader
            String leaderId = leaderLatchList.get(0).getLeader().getId();
            System.out.println("当前leader id : " + leaderId);
            leaderLatchList.forEach(item -> {
                // 这里我们吧leader退出选举,让剩下的重新选举
                if (item.getId().equals(leaderId)) {
                    try {
                        item.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
    
    
            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
            leaderLatchList.forEach(curatorFramework -> {
                // 退出选举
                try {
                    curatorFramework.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
    
    
        }
    

    2.2.2 LeaderSelector

           LeaderSelector也是一个用于分布式选举的类,相对于LeaderLatch来说,LeaderSelector更加的灵活点。LeaderSelector使用的时候主要涉及下面几个类:

    • LeaderSelector:LeaderSelector选举实例类。
    • LeaderSelectorListener:监听选举状态和连接状态
    • LeaderSelectorListenerAdapter:实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类。
    • CancelLeadershipException: 抢主失败异常。

    LeaderSelector api 介绍

    public class LeaderSelector {
    
        /**
         * 构造函数
         * @param client     CuratorFramework
         * @param leaderPath 路径
         * @param listener   监听器
         */
        public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
        public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
        public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
    
    
        /**
         * 保证在此实例释放领导权之后还可能获得领导权
         */
        public void autoRequeue();
    
        /**
         * 设置获取当前实例对应的id
         */
        public void setId(String id);
        public String getId();
    
        /**
         * 当前实例参与选举
         */
        public void start();
    
        /**
         * 重新键入到参与者队列里面去选举,如果此实例已在参与者排队里面,则不会发生任何操作并返回false。如果实例未排队,则重新执行该操作并返回true
         */
        public boolean requeue();
    
        /**
         * 退出选举
         */
        public synchronized void close();
    
        /**
         * 获取所有的参与者
         */
        public Collection<Participant> getParticipants() throws Exception;
    
        /**
         * 获取leader
         */
        public Participant getLeader() throws Exception;
    
        /**
         * 当前节点是否是leader
         */
        public boolean hasLeadership();
    
        /**
         * 如果当前实例是leader的话,尝试终断领导权
         */
        public synchronized void interruptLeadership();
    
    
    }
    
    

    ConnectionStateListener、LeaderSelectorListener

    public interface ConnectionStateListener {
        /**
         * 监听网络连接问题
         */
        public void stateChanged(CuratorFramework client, ConnectionState newState);
    }
    
    /**
     * Notification for leadership
     *
     * @see LeaderSelector
     */
    public interface LeaderSelectorListener extends ConnectionStateListener {
        /**
         * 当前节点获取到leader权之后调用,注意:在您希望释放领导力之前,此方法不应返回
         * 所以说如果你想一直占有leader权利,就在里面写个无限循环吧
         */
        public void         takeLeadership(CuratorFramework client) throws Exception;
    }
    

    LeaderSelectorListenerAdapter

    /**
     * 实现了LeaderSelectorListener接口的一个抽象类,封装了客户端与zk服务器连接挂起或者断开时的处理逻辑(抛出抢主失败CancelLeadershipException),一般监听器推荐实现该类
     */
    public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
        /**
         * 当遇到SUSPENDED以及LOST时直接抛出CancelLeadershipException从而去引发LeaderSelector.interruptLeadership()调用
         */
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ) {
                throw new CancelLeadershipException();
            }
        }
    }
    

           我们还是用一个简单的实例来说明LeaderSelector的用法,我们还是创建10个zookeeper客户端。并且我们创建一个LeaderSelectorAdapter类,在里面当是leader之后的一些处理,如果是leader 10s之后,释放leader权力重新选举。

    public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter {
    
        private final LeaderSelector leaderSelector;
    
        public LeaderSelectorAdapter(CuratorFramework client, String path, String id) {
            // 创建一个LeaderSelector对象
            leaderSelector = new LeaderSelector(client, path, this);
            // 设置id
            leaderSelector.setId(id);
            // 保证在此实例释放领导权之后还可能获得领导权
            leaderSelector.autoRequeue();
        }
    
        /**
         * 参与选举
         */
        public void start() {
            // 参与选举
            leaderSelector.start();
        }
    
        /**
         * 退出选举
         */
        public void close() {
            // 退出选举
            leaderSelector.close();
        }
    
        /**
         * 当获得leader的时候,这个方法会被调用。如果还想继续当leader,这个方法不能返回。如果你想要要此实例一直是leader的话可以加一个死循环
         */
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            System.out.println(leaderSelector.getId() + " 是leader");
            try {
                // 当上leader 5s之后,释放leader权利
                Thread.sleep(TimeUnit.SECONDS.toMillis(10));
            } catch (InterruptedException e) {
                System.err.println(leaderSelector.getId() + " 被中断.");
                Thread.currentThread().interrupt();
            } finally {
                System.out.println(leaderSelector.getId() + " 释放leader的权力。");
            }
        }
    }
    
        private static final String LEADER_PATH = "/tuacy/leaderSelector";
    
        @Test
        public void leaderSelector() throws Exception {
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
            List<LeaderSelectorAdapter> leaderLatchList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 这里我们所有的客户端都参与leader选举
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                // 所有的客户端都参与leader选举
                final LeaderSelectorAdapter latch = new LeaderSelectorAdapter(zookeeperClientList.get(index), LEADER_PATH, index + "");
                latch.start();
                leaderLatchList.add(latch);
            }
    
            // 1分钟之后关掉程序
            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
            leaderLatchList.forEach(curatorFramework -> {
                // 退出选举
                curatorFramework.close();
    
            });
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
        }
    

    2.3 分布式锁

           分布式锁也是咱们分布式系统里面非常常见的功能了。Curator直接就帮我们做到了,省的我们自己去实现分布式锁。

    2.3.1 InterProcessMutex

           InterProcessMutex公平锁、可重入锁。和ReentrantLock类似。

    InterProcessMutex api 介绍

    public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    
        /**
         * InterProcessMutex的构造函数,
         */
        public InterProcessMutex(CuratorFramework client, String path);
        public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
    
        /**
         * 申请获取锁
         */
        @Override
        public void acquire() throws Exception;
        @Override
        public boolean acquire(long time, TimeUnit unit) throws Exception;
    
        /**
         *
         * 如果此JVM中的线程获取了互斥锁,则返回true
         */
        @Override
        public boolean isAcquiredInThisProcess();
    
        /**
         * 释放锁
         */
        @Override
        public void release() throws Exception;
    
        /**
         * 返回所有参与获取锁的所有当前节点的排序列表
         */
        public Collection<String> getParticipantNodes() throws Exception;
    
        /**
         * 将锁设为可撤销的. 当别的进程或线程想让你释放锁是Listener会被调用
         */
        @Override
        public void makeRevocable(RevocationListener<InterProcessMutex> listener);
        @Override
        public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor);
    
        /**
         * 如果调用线程获取互斥锁,则返回true
         */
        public boolean isOwnedByCurrentThread();
    
    }
    

    2.3.2 InterProcessSemaphoreMutex

           InterProcessSemaphoreMutex不可重入锁。

    InterProcessSemaphoreMutex api介绍

    public class InterProcessSemaphoreMutex implements InterProcessLock {
    
        /**
         * 构造函数
         */
        public InterProcessSemaphoreMutex(CuratorFramework client, String path);
    
        /**
         * 申请获取锁
         */
        @Override
        public void acquire() throws Exception;
    
        @Override
        public boolean acquire(long time, TimeUnit unit) throws Exception;
    
        /**
         * 释放锁
         */
        @Override
        public void release() throws Exception;
    
        /**
         * 如果此JVM中的线程获取了互斥锁,则返回true
         */
        @Override
        public boolean isAcquiredInThisProcess();
    }
    

    2.3.3 InterProcessReadWriteLock

           InterProcessReadWriteLock 读写锁。和ReadWriteLock类似。

    InterProcessReadWriteLock api 介绍

    public class InterProcessReadWriteLock {
    
        /**
         * 读锁
         */
        private final InterProcessMutex readMutex;
        /**
         * 写锁
         */
        private final InterProcessMutex writeMutex;
        
        /**
         * 构造函数
         */
        public InterProcessReadWriteLock(CuratorFramework client, String basePath)
    
        /**
         * 构造函数
         * lockData是存储在节点上的数据
         */
        public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData);
    
        /**
         * 获取读锁
         */
        public InterProcessMutex     readLock();
    
        /**
         * 获取写锁
         */
        public InterProcessMutex     writeLock();
    
    }
    

    2.3.4 信号量(InterProcessSemaphoreV2)

           InterProcessSemaphoreV2实现了一个跨jvm的信号量,主要工作原理是:acquire时创建一个临时顺序节点,如果创建成功且临时节点数小于等于maxLeases则说明信号量获取成功,否则wait等待,等待目录发生变化或计数改变时唤醒。和Semaphore的功能类似.

           分布式信号量的使用。我们需要了解以下三个类。

    • InterProcessSemaphoreV2:信号量实现类
    • Lease:租约(单个信号)
    • SharedCountReader:计数器,用于计算最大租约数量

    InterProcessSemaphoreV2 api 介绍

    public class InterProcessSemaphoreV2 {
    
        /**
         * 构造函数
         * @param client    CuratorFramework
         * @param path      节点路径
         * @param maxLeases 允许此实例的最大租约数
         */
        public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);
    
        /**
         * 构造函数
         * @param client CuratorFramework
         * @param path   节点路径
         * @param count  用于最大租约的共享计数
         */
        public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);
    
        /**
         * 此信号量创建的节点放置的数据,必须在调用其中一个acquire()方法之前调用它
         */
        public void setNodeData(byte[] nodeData);
    
        /**
         * 返回参与信号量的所有当前节点的列表
         */
        public Collection<String> getParticipantNodes() throws Exception;
    
        /**
         * 关闭给定租约集合中的所有租约
         */
        public void returnAll(Collection<Lease> leases);
    
        /**
         * 关闭租约
         */
        public void returnLease(Lease lease);
    
        /**
         * 获取租约,如果没有租约获取会一直阻塞直到获取到租约
         */
        public Lease acquire() throws Exception;
        public Lease acquire(long time, TimeUnit unit) throws Exception
    
        /**
         * 获取指定数量的租约,如果没有获取到制定数量租约会一直阻塞
         */
        public Collection<Lease> acquire(int qty) throws Exception;
        public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;
        
    }
    

    2.3.5 InterProcessMultiLock(多共享锁对象

           它可以把多个锁包含起来像一个锁一样进行操作,简单来说就是对多个锁进行一组操作。当acquire的时候就获得多个锁资源,否则失败。同样调用release时所有的锁都被release(失败被忽略)。

    InterProcessMultiLock api 介绍

    public class InterProcessMultiLock implements InterProcessLock {
    
        /**
         * 构造函数
         *
         * @param client CuratorFramework
         * @param paths 节点列表对应的路径(多个路径就是多个锁)
         */
        public InterProcessMultiLock(CuratorFramework client, List<String> paths);
    
        /**
         * 构造函数
         */
        public InterProcessMultiLock(List<InterProcessLock> locks);
    
        /**
         * 请求锁
         */
        @Override
        public void acquire() throws Exception;
        @Override
        public boolean acquire(long time, TimeUnit unit) throws Exception;
    
        /**
         * 释放锁
         */
        @Override
        public synchronized void release() throws Exception;
    
        /**
         * 如果此JVM中的线程获取了所有的锁,则返回true
         */
        @Override
        public synchronized boolean isAcquiredInThisProcess();
    }
    

    2.4 分布式计数器

           计数器是用来计数的,利用ZooKeeper可以实现一个分布式计数器。只要使用相同的path就可以得到最新的计数器值,这是由ZooKeeper的一致性保证的。Curator有两个计数器,一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

    2.4.1 SharedCount(int计数器)

           SharedCount使用int类型来计数。相当于多个zookeeper客户端公用一个计算器。

    • SharedCount:计数器的具体实现。
    • SharedCountListener:监听数据的改变。

    SharedCount api 介绍

    public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener> {
    
    
        /**
         * 构造函数
         * @param client CuratorFramework
         * @param path 计数器依赖的节点
         * @param seedValue 如果当前节点对应的计数器没有值,就会用该值
         */
        public SharedCount(CuratorFramework client, String path, int seedValue);
        protected SharedCount(CuratorFramework client, String path, SharedValue sv);
    
        /**
         * 获取当前计数
         */
        @Override
        public int getCount();
    
        /**
         * 获取当前节点对应的版本信息
         */
        @Override
        public VersionedValue<Integer> getVersionedValue();
    
        /**
         * 设置计数器的值
         */
        public void     setCount(int newCount) throws Exception;
    
        /**
         * 设置计数器的值,这里要注意如果当前版本的值在这个时刻有改变则设置不成功。CAS操作
         */
        public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
    
        /**
         * 添加监听器
         */
        @Override
        public void     addListener(SharedCountListener listener);
        @Override
        public void     addListener(final SharedCountListener listener, Executor executor);
    
        /**
         * 移除监听器
         */
        @Override
        public void     removeListener(SharedCountListener listener);
    
        /**
         * 启动
         */
        public void     start() throws Exception;
    
        /**
         * 结束
         */
        @Override
        public void close() throws IOException;
    }
    

           SharedCount使用实例。模拟了10个zookeeper客户端。每个客户端都加5次。最终结果50就对了。

    public class SharedCountTest {
    
        private static final String PATH_COUNTER = "/int/counter";
    
        class CounterThread extends Thread {
    
            private final CountDownLatch countDownLatch;
            private final int threadIndex;
            private final SharedCount counter;
    
            CounterThread(SharedCount counter, int index, CountDownLatch countDownLatch) {
                this.counter = counter;
                this.threadIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    for (int index = 0; index < 5; index++) {
                        while (true) {
                            Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                            boolean success = counter.trySetCount(counter.getVersionedValue(), counter.getCount() + 1);
                            if (success) {
                                break;
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        System.out.println("当前值为:" + counter.getCount());
                        counter.close();
                    } catch (Exception e) {
                        //ignore
                    }
                    countDownLatch.countDown();
                }
    
            }
        }
    
        @Test
        public void sharedCount() throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 如果节点存在,我们就删除节点
            zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);
    
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                SharedCount sharedCount = new SharedCount(zookeeperClientList.get(index), PATH_COUNTER, 0);
                sharedCount.addListener(new SharedCountListener() {
                    @Override
                    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                        System.out.println("计数器值改变,现在的值为:" + newCount);
                    }
    
                    @Override
                    public void stateChanged(CuratorFramework client, ConnectionState newState) {
                        // 连接状态改变
                    }
                });
                sharedCount.start();
                new CounterThread(sharedCount, index, countDownLatch).start();
            }
    
            countDownLatch.await();
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
        }
    
    }
    

    2.4.2 DistributedAtomicLong(long计数器)

           DistributedAtomicLong使用Long类型来计数。

    DistributedAtomicLong api 介绍

    public class DistributedAtomicLong implements DistributedAtomicNumber<Long> {
    
        /**
         * 构造函数(乐观锁模式)
         *
         * @param client CuratorFramework
         * @param counterPath 节点路径
         * @param retryPolicy 重试策略 -- 乐观加锁
         */
        public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy);
    
        /**
         * 构造函数,retryPolicy(乐观加锁)还没成功,则进行promotedToLock的方式以互斥的方式加锁重试 (排他锁模式)
         *
         * @param client CuratorFramework
         * @param counterPath 节点路径
         * @param retryPolicy 重试策略 -- 乐观加锁
         * @param promotedToLock 排他锁策略
         */
        public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy, PromotedToLock promotedToLock);
    
        /**
         * 获取当前值
         */
        @Override
        public AtomicValue<Long>     get() throws Exception
    
        /**
         * 强制设置计数值
         */
        @Override
        public void forceSet(Long newValue) throws Exception;
    
        /**
         * CAS更新(乐观锁模式更新)
         */
        @Override
        public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception;
    
        /**
         * 设置值
         */
        @Override
        public AtomicValue<Long>   trySet(Long newValue) throws Exception;
    
        /**
         * 如果之前没有初始值,则把初始值设置进去
         */
        @Override
        public boolean initialize(Long initialize) throws Exception;
    
        /**
         * +1
         */
        @Override
        public AtomicValue<Long>    increment() throws Exception;
    
        /**
         * -1
         */
        @Override
        public AtomicValue<Long>    decrement() throws Exception;
    
        /**
         * 加一个指定的值
         */
        @Override
        public AtomicValue<Long>    add(Long delta) throws Exception;
    
        /**
         * 键一个指定的值
         */
        @Override
        public AtomicValue<Long> subtract(Long delta) throws Exception;
    
    
    }
    

           DistributedAtomicLong怎么使用,直接给实例。也是模拟10个客户端,每个客户端增加5次。最终结果得到50就对了。

    public class DistributedAtomicLongTest {
    
        private static final String PATH_COUNTER = "/long/counter";
    
        class CounterThread extends Thread {
    
            private final CountDownLatch countDownLatch;
            private final int threadIndex;
            private final DistributedAtomicLong counter;
    
            CounterThread(DistributedAtomicLong counter, int index, CountDownLatch countDownLatch) {
                this.counter = counter;
                this.threadIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    for (int index = 0; index < 5; index++) {
                        // 保证成功
                        while (true) {
                            AtomicValue<Long> value = counter.increment();
                            if (value.succeeded()) {
                                System.out.println("succeed: " + value.succeeded() + " value:" + value.postValue());
                                break;
                            }
                            Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
    
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
    
            }
        }
    
        @Test
        public void distributedAtomicLong() throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 如果节点存在,我们就删除节点
            if (zookeeperClientList.get(0).checkExists().forPath(PATH_COUNTER) != null) {
                zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);
            }
    
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                // 乐观锁模式
                DistributedAtomicLong count = new DistributedAtomicLong(zookeeperClientList.get(index), PATH_COUNTER, new RetryNTimes(10, 10));
                boolean initializeSuccess = count.initialize(0L);
                if (initializeSuccess) {
                    System.out.println("初始化成功");
                } else {
                    System.out.println("初始化失败");
                }
                new CounterThread(count, index, countDownLatch).start();
            }
    
            countDownLatch.await();
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
        }
    
    }
    

    2.5 分布式队列

    2.5.4 简单队列 - SimpleDistributedQueue

           SimpleDistributedQueue是一种简单队列,和jdk中队列类似,拥有offer()、take()方法。

           SimpleDistributedQueue的使用是很简单的,所以我们就直接给出SimpleDistributedQueue的使用实例了。

    public class SimpleDistributedQueueTest {
    
        private static final String SIMPLE_DISTRIBUTED_QUEUE_PATH = "/SimpleDistributedQueue";
    
        class QueueActionThread extends Thread {
    
            private final SimpleDistributedQueue queue;
            private final CountDownLatch countDownLatch;
            private final int queueIndex;
    
            QueueActionThread(SimpleDistributedQueue queue, int index, CountDownLatch countDownLatch) {
                this.queue = queue;
                this.queueIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    for (int index = 0; index < 5; index++) {
                        String message = "我是队列:" + queueIndex + " 的第-" + index + "-条消息";
                        this.queue.offer(message.getBytes());
                    }
                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                    for (int index = 0; index < 5; index++) {
                        byte[] queueItem = queue.take();
                        System.out.println("我是队列:" + queueIndex + " 我收到了:" + new String(queueItem));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
    
            }
        }
    
        @Test
        public void simpleDistributedQueue() throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                SimpleDistributedQueue queue = new SimpleDistributedQueue(zookeeperClientList.get(index), SIMPLE_DISTRIBUTED_QUEUE_PATH);
                new QueueActionThread(queue, index, countDownLatch).start();
            }
    
            countDownLatch.await();
            // 关闭客户端
            zookeeperClientList.forEach(CuratorFramework::close);
        }
    
    }
    

    2.5.2 普通队列 - DistributedQueue

           DistributedQueue是一种非常谱图的队列,没啥骚操作。

           DistributedQueue的使用也是非常简单的,我们也直接给出DistributedQueue的使用实例。

    public class DistributedQueueTest {
    
        private static final String DISTRIBUTED_QUEUE_PATH = "/queue/distributedQueue";
    
        class QueueActionThread extends Thread {
    
            private final DistributedQueue<String> queue;
            private final CountDownLatch countDownLatch;
            private final int queueIndex;
    
            QueueActionThread(DistributedQueue<String> queue, int index, CountDownLatch countDownLatch) {
                this.queue = queue;
                this.queueIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    this.queue.start();
                    for (int index = 0; index < 5; index++) {
                        queue.put("队列 " + queueIndex + " 发来的消息:" + index);
                        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
    
            }
        }
    
    
    
        @Test
        public void distributedQueue() throws Exception {
    
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                QueueBuilder<String> queueBuild = QueueBuilder.builder(zookeeperClientList.get(index), index % 2 == 0 ? new ConsumerImp(index + "") : null, createQueueSerializer(), DISTRIBUTED_QUEUE_PATH);
                DistributedQueue<String> queue = queueBuild.buildQueue();
                new QueueActionThread(queue, index, countDownLatch).start();
            }
    
            countDownLatch.await();
            // 关闭客户端
            zookeeperClientList.forEach(CuratorFramework::close);
    
        }
    
        /**
         * 队列消息序列化实现类
         */
        private static QueueSerializer<String> createQueueSerializer() {
            return new QueueSerializer<String>() {
                @Override
                public byte[] serialize(String item) {
                    return item.getBytes();
                }
    
                @Override
                public String deserialize(byte[] bytes) {
                    return new String(bytes);
                }
            };
        }
    
        private class ConsumerImp implements QueueConsumer<String>{
    
            private String consumerName;
    
            public ConsumerImp(String consumerName) {
                this.consumerName = consumerName;
            }
    
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(consumerName + " 收到消息: " + message);
            }
    
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
    
            }
        }
    
    }
    

    2.5.3 带id的队列 - DistributedIdQueue

           DistributedIdQueue相对于DistributedQueue来说就是队列里面的每个id都带有一个id。所以DistributedIdQueue可以根据id删除队列里面的数据。其他部分和DistributedQueue一样。实例我们就不写了。

    2.5.4 优先级队列 - DistributedPriorityQueue

           DistributedPriorityQueue是带有优先级的队列,优先级别高的先消费。使用和DistributedQueue是差不多的。实例我们就不写了。

    2.5.4 延迟队列 - DistributedDelayQueue

           DistributedDelayQueue是带有延时功能的队列。消息入队的时候可以指定延时时间。让该消息延时一段时间之后才可以被消费。用法和DistributedQueue差不多。就不写具体的实例代码了。

    2.6 分布式屏障 - Barrier

           分布式Barrier是这样一个功能:它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行。

    2.6.1 DistributedBarrier

           DistributedBarrier允许多个分布式线程任务等待放行。直到有地方说放行则这些分布式线程进入执行任务。

    DistributedBarrier api 介绍。

    public class DistributedBarrier {
    
        /**
         * @param client CuratorFramework
         * @param barrierPath barrier路径节点
         */
        public DistributedBarrier(CuratorFramework client, String barrierPath);
    
        /**
         * 设置栅栏,它将阻塞在它上面等待的线程:
         */
        public synchronized void         setBarrier() throws Exception;
    
        /**
         * 设置栅栏
         */
        public synchronized void      removeBarrier() throws Exception;
    
        /**
         * 等待放行条件
         */
        public synchronized void      waitOnBarrier() throws Exception
        public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception;
    }
    

           DistributedBarrier的使用。比如这里我们模拟了10个zookeeper客户端。等待放行。

    public class DistributedBarrierTest {
    
        private static final String BARRIER_PATH_COUNTER = "/barrier";
    
        class LogicThread extends Thread {
    
            private final CountDownLatch countDownLatch;
            private final int threadIndex;
            private final DistributedBarrier barrier;
    
            LogicThread(DistributedBarrier barrier, int index, CountDownLatch countDownLatch) {
                this.barrier = barrier;
                this.threadIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    System.out.println("线程: " + threadIndex + " 请求进入");
                    // 阻塞等待
                    barrier.waitOnBarrier();
                    System.out.println("线程: " + threadIndex + " 成功进入");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
    
            }
        }
    
        @Test
        public void distributedBarrier() throws Exception {
    
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 如果节点存在,我们就删除节点
            if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {
                zookeeperClientList.get(0).delete().forPath(BARRIER_PATH_COUNTER);
            }
            DistributedBarrier controlBarrier = new DistributedBarrier(zookeeperClientList.get(0), BARRIER_PATH_COUNTER);
            controlBarrier.setBarrier();
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                DistributedBarrier barrier = new DistributedBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER);
                new LogicThread(barrier, index, countDownLatch).start();
            }
    
            Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
            controlBarrier.removeBarrier();
            countDownLatch.await();
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
    
        }
    
    }
    

    2.6.2 DistributedDoubleBarrier

           DistributedDoubleBarrier:类似CyclicBarrier 。允许多个分布式线程等待,等线程个数达到了指定数量的时候,就可以同时执行或者同时退出了。

    DistributedDoubleBarrier api 的使用

    public class DistributedDoubleBarrier {
    
        /**
         * 构造函数,
         * memberQty是成员数量,当enter()方法被调用时,成员被阻塞,直到所有的成员都调用了enter()
         * 当leave()方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave()
         */
        public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);
    
        /**
         * 进入栅栏并且阻塞,直到所有的成员都进入
         */
        public void     enter() throws Exception;
        public boolean     enter(long maxWait, TimeUnit unit) throws Exception;
    
        /**
         * 退出栅栏并且阻塞,知道所有的成员都退出
         */
        public synchronized void     leave() throws Exception;
        public synchronized boolean     leave(long maxWait, TimeUnit unit) throws Exception;
        
    }
    

           DistributedDoubleBarrier的简单使用,我们模拟10个zookeeper客户端。当有五个说要执行或者退出的时候。我们就执行或者退出。

    public class DistributedDoubleBarrierTest {
    
        private static final String BARRIER_PATH_COUNTER = "/barrier";
    
        class LogicThread extends Thread {
    
            private final CountDownLatch countDownLatch;
            private final int threadIndex;
            private final DistributedDoubleBarrier barrier;
    
            LogicThread(DistributedDoubleBarrier barrier, int index, CountDownLatch countDownLatch) {
                this.barrier = barrier;
                this.threadIndex = index;
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    Uninterruptibles.sleepUninterruptibly(5 * threadIndex, TimeUnit.SECONDS);
                    System.out.println("线程:" + threadIndex + " 请求进入");
                    barrier.enter();
                    System.out.println("线程:" + threadIndex + " 成功进入");
    
                    System.out.println("线程:" + threadIndex + " 请求离开");
                    barrier.leave();
                    System.out.println("线程:" + threadIndex + " 成功离开");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
    
            }
        }
    
        @Test
        public void distributedDoubleBarrier() throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(10);
            List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
    
            // 启动10个zookeeper客户端
            for (int index = 0; index < 10; index++) {
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .retryPolicy(retryPolicy)
                        .sessionTimeoutMs(6000)
                        .connectionTimeoutMs(6000)
                        .build();
                // 启动客户端
                client.start();
                zookeeperClientList.add(client);
            }
    
            // 如果节点存在,我们就删除节点
            if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {
                zookeeperClientList.get(0).delete().forPath(BARRIER_PATH_COUNTER);
            }
    
            for (int index = 0; index < zookeeperClientList.size(); index++) {
                DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER, 5);
                new LogicThread(barrier, index, countDownLatch).start();
            }
    
            countDownLatch.await();
            zookeeperClientList.forEach(curatorFramework -> {
                // 关闭客户端
                curatorFramework.close();
            });
        }
    
    }
    

    三 Spring Boot使用Curator

           Spring Boot中使用Curator,我们要想办法创建一个zookeeper客户端.然后把这个客户端对象添加到Spring容器中去.这样我们就可以在各个地方拿到这个zookeeper客户端对象.

           说先我们创建一个ZkClient类.这个ZkClient类就代码我们一个zookeeper客户端.

    public class ZkClient {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        /**
         * zookeeper客户端实例
         */
        private CuratorFramework client;
        /**
         * 服务器列表,格式host1:port1,host2:port2,...
         */
        private String zookeeperServer;
        /**
         * 会话超时时间,单位毫秒,默认60000ms
         */
        private int sessionTimeoutMs;
        /**
         * 连接创建超时时间,单位毫秒,默认60000ms
         */
        private int connectionTimeoutMs;
        /**
         * 重试之间等待的初始时间
         */
        private int baseSleepTimeMs;
        /**
         * 当连接异常时的重试次数
         */
        private int maxRetries;
        /**
         * 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
         */
        private String namespace;
    
        public void setZookeeperServer(String zookeeperServer) {
            this.zookeeperServer = zookeeperServer;
        }
        
    
        public void setSessionTimeoutMs(int sessionTimeoutMs) {
            this.sessionTimeoutMs = sessionTimeoutMs;
        }
    
        public void setConnectionTimeoutMs(int connectionTimeoutMs) {
            this.connectionTimeoutMs = connectionTimeoutMs;
        }
        public void setBaseSleepTimeMs(int baseSleepTimeMs) {
            this.baseSleepTimeMs = baseSleepTimeMs;
        }
    
        public void setMaxRetries(int maxRetries) {
            this.maxRetries = maxRetries;
        }
    
        public void setNamespace(String namespace) {
            this.namespace = namespace;
        }
    
        /**
         * spring 自动调用,不需要我们主动调用
         */
        public void init() {
            // 创建客户端
            // 重连规则
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zookeeperServer)
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .namespace(namespace)
                    .build();
            // 启动客户端,连接服务器
            client.start();
        }
    
        /**
         * spring 自动调用,不需要我们主动调用
         */
        public void stop() {
            // 关闭客户端
            client.close();
        }
    
        /**
         * 获取 zookeeper 客户端对象
         *
         * @return CuratorFramework
         */
        public CuratorFramework getClient() {
            return client;
        }
    
    }
    

           接下来我们把ZkClient添加到Srping容器里面去.而且这里我们把一些动态配置信息都放到了application.yml文件里面去了.

    @Configuration
    public class ZkConfiguration {
    
        /**
         * 服务器列表,格式host1:port1,host2:port2,...
         */
        @Value("${zookeeper.server}")
        private String zookeeperServer;
        /**
         * 会话超时时间,单位毫秒,默认60000ms
         */
        @Value(("${zookeeper.sessionTimeoutMs}"))
        private int sessionTimeoutMs;
        /**
         * 连接创建超时时间,单位毫秒,默认60000ms
         */
        @Value("${zookeeper.connectionTimeoutMs}")
        private int connectionTimeoutMs;
        /**
         * 当连接异常时的重试次数
         */
        @Value("${zookeeper.maxRetries}")
        private int maxRetries;
        /**
         * 重试之间等待的初始时间
         */
        @Value("${zookeeper.baseSleepTimeMs}")
        private int baseSleepTimeMs;
        /**
         * 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间
         */
        @Value("${zookeeper.namespace}")
        private String namespace;
    
        @Bean(initMethod = "init", destroyMethod = "stop")
        public ZkClient zkClient() {
            ZkClient zkClient = new ZkClient();
            zkClient.setZookeeperServer(zookeeperServer);
            zkClient.setSessionTimeoutMs(sessionTimeoutMs);
            zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
            zkClient.setMaxRetries(maxRetries);
            zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
            zkClient.setNamespace(namespace);
            return zkClient;
        }
    
    }
    

    application.yml文件增加配置信息

    
    # zeekeeper配置
    zookeeper:
      server: 127.0.0.1:2181 # 服务器列表,格式host1:port1,host2:port2,...
      sessionTimeoutMs: 6000 # 会话超时时间,单位毫秒,默认60000ms
      connectionTimeoutMs: 6000 # 连接创建超时时间,单位毫秒,默认60000ms
      maxRetries: 3 # 当连接异常时的重试次数
      baseSleepTimeMs: 1000 # 重试之间等待的初始时间
      namespace: lock # 为了实现不同的Zookeeper业务之间的隔离,有的时候需要为每个业务分配一个独立的命名空间,不需要的时候可以去掉
    
    

           这样我们就可以在我们项目里面的任何地方得到ZkClient对象了.我们可以在zookeeper客户端为所欲为了.


           到此关于java zookeeper客户端Curator的使用部分就讲完了.文章中设计到的所有实例代码在 https://github.com/tuacy/java-study工程目录的zookeeper文件下面可以找到.

    相关文章

      网友评论

          本文标题:ZooKeeper客户端Curator使用

          本文链接:https://www.haomeiwen.com/subject/uuodsctx.html