美文网首页
3、zk客户端curator使用(转)

3、zk客户端curator使用(转)

作者: 小manong | 来源:发表于2019-05-02 13:30 被阅读0次

    一、curator简介

    • Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

    curator-framework:对zookeeper的底层api的一些封装
    curator-client:提供一些客户端的操作,例如重试策略等
    curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

    • 版本问题。使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败
    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>
    

    二、Curator-framework基本api使用

    1、创建会话

    (1)静态工厂方法创建会话

    String connectionInfo = "127.0.0.1:2181";
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory
                    .newClient(connectionInfo, 5000, 3000, retryPolicy);
    

    newClient静态工厂方法包含四个主要参数:


    静态工厂参数

    (2)使用fluent流式创建

      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client =
            CuratorFrameworkFactory.builder()
                    .connectString(connectionInfo)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    

    (3)创建包含命名空间的会话

    • 为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client =
            CuratorFrameworkFactory.builder()
                    .connectString(connectionInfo)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    #创建命名空间为namespace的会话(该会话内的操作都是基于该目录进行的)
                    .namespace("namespace")
                    .build();
    
    2、启动客户端
    client.start();
    
    3、创建数据节点
    • Zookeeper的节点创建模式:

    PERSISTENT:持久化
    PERSISTENT_SEQUENTIAL:持久化并且带序列号
    EPHEMERAL:临时
    EPHEMERAL_SEQUENTIAL:临时并且带序列号

    //创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
    client.create()
          .creatingParentContainersIfNeeded()
          .withMode(CreateMode.EPHEMERAL)
          .forPath("path","init".getBytes());
    
    4、删除节点
    //删除一个节点,并且递归删除其所有的子节点
    client.delete().deletingChildrenIfNeeded().forPath("path");
    //删除一个节点,强制指定版本进行删除
    client.delete().withVersion(10086).forPath("path");
    //删除一个节点,强制保证删除,guaranteed()接口是一个保障措施,只要客户端会话有效,
    //那么Curator会在后台持续进行删除操作,直到删除节点成功。
    client.delete().guaranteed().forPath("path");
    
    
    5、读取数据
    //读取一个节点的数据内容,同时获取到该节点的stat
    读取一个节点的数据内容,同时获取到该节点的stat
    
    6、更新节点数据
    //更新一个节点的数据内容,该接口会返回一个Stat实例
     Stat path = client.setData().forPath("path", "data".getBytes());
    //更新一个节点的数据内容,强制指定版本进行更新
    client.setData().withVersion(10086).forPath("path","data".getBytes());
    
    7、检测节点是否存在
     Stat stat = client.checkExists().forPath("path");
    
    8、获取某个节点的所有子节点路径
    //该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处
    //理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
    client.getChildren().forPath("path");
    
    9、事物
    • CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
    client.inTransaction().check().forPath("path")
          .and()
          .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
          .and()
          .setData().withVersion(10086).forPath("path","data2".getBytes())
          .and()
          .commit();
    
    10、异步接口
    • 上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
    • CuratorEventType:


      CuratorEventType对应的事件类型
    • 响应码(#getResultCode()):


      状态响应码
    //异步的创建节点
       Executor executor = Executors.newFixedThreadPool(2);
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .inBackground((curatorFramework, curatorEvent) -> {
                        System.out.println(String.format("eventType:%s,resultCode:%s", 
                                curatorEvent.getType(), curatorEvent.getResultCode()));
                    }, executor)
                    .forPath("path");
    

    二、Curator-recipes实现高级特征

    1、缓存cache
    • Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

    (1)Path Cache

    • Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。涉及到的四个类:

    PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData

    //1、构造函数
    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
    //2、想使用cache,必须调用它的start方法,使用完后调用close方法。可以设置StartMode来实现启动的模式,NORMAL:正常初始化。BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()。POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
    //3、可以增加listener监听缓存的变化。
    public void addListener(PathChildrenCacheListener listener)
    //4、遍历所有的子节点
    getCurrentData()

     private static final String PATH = "/example/pathCache";
    
        public static void main(String[] args) throws Exception {
            //1.创建client并启动
            String connectionInfo = "127.0.0.1:2181";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory
                    .newClient(connectionInfo, 5000, 3000, retryPolicy);
            client.start();
            //2.创建path cache并启动
            PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
            cache.start();
            //3、进行事件监听
            PathChildrenCacheListener cacheListener = (client1, event) -> {
                System.out.println("事件类型:" + event.getType());
                //如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的
                // event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
                if (null != event.getData()) {
                    System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
                }
            };
            cache.getListenable().addListener(cacheListener);
            //4、进行响应的curd操作,看看事件回调结果
            client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
            Thread.sleep(10);
            client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
            Thread.sleep(10);
            client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
            Thread.sleep(10);
            for (ChildData data : cache.getCurrentData()) {
                System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
            }
            client.delete().forPath("/example/pathCache/test01");
            Thread.sleep(10);
            client.delete().forPath("/example/pathCache/test02");
            Thread.sleep(1000 * 5);
            cache.close();
            client.close();
            System.out.println("OK!");
        }
    

    (2)Node Cache

    • NodeCache只能监听一个节点的状态变化。
    • Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:

    NodeCache - Node Cache实现类
    NodeCacheListener - 节点监听器
    ChildData - 节点数据

     client.create().creatingParentsIfNeeded().forPath(PATH);
            final NodeCache cache = new NodeCache(client, PATH);
            NodeCacheListener listener = () -> {
                ChildData data = cache.getCurrentData();
                if (null != data) {
                    System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
                } else {
                    System.out.println("节点被删除!");
                }
            };
            cache.getListenable().addListener(listener);
            cache.start();
            client.setData().forPath(PATH, "01".getBytes());
            Thread.sleep(100);
            client.setData().forPath(PATH, "02".getBytes());
            Thread.sleep(100);
            client.delete().deletingChildrenIfNeeded().forPath(PATH);
            Thread.sleep(1000 * 2);
            cache.close();
            client.close();
            System.out.println("OK!");
    

    (3)Tree Cache

    • Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:

    TreeCache - Tree Cache实现类
    TreeCacheListener - 监听器类
    TreeCacheEvent - 触发的事件类
    ChildData - 节点数据

    • 注意:TreeCache在初始化(调用start()方法)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针异常,这里应该主动处理并避免这种情况。
     client.create().creatingParentsIfNeeded().forPath(PATH);
            TreeCache cache = new TreeCache(client, PATH);
            TreeCacheListener listener = (client1, event) ->
                    System.out.println("事件类型:" + event.getType() +
                            " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
            cache.getListenable().addListener(listener);
            cache.start();
            client.setData().forPath(PATH, "01".getBytes());
            Thread.sleep(100);
            client.setData().forPath(PATH, "02".getBytes());
            Thread.sleep(100);
            client.delete().deletingChildrenIfNeeded().forPath(PATH);
            Thread.sleep(1000 * 2);
            cache.close();
            client.close();
            System.out.println("OK!");
    
    2、leader选举
    • Curator 有两种leader选举的recipe,分别是LeaderSelector和LeaderLatch。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。
      (1)LeaderLatch
    public class LeaderLatchTest {
        /**
         * 1、一旦启动,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后其中一个最终会被选举为leader,
         * 2、可以通过hasLeadership方法查看LeaderLatch实例是否leader:
         * 3、 LeaderLatch在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close方法。
         * 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。
         * 4、 错误处理:LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时,
         * leader不再认为自己还是leader。当LOST后连接重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新
         * 创建一个。LeaderLatch用户必须考虑导致leadership丢失的连接问题。
         * 强烈推荐你使用ConnectionStateListener
         */
    
        protected static String PATH = "/francis/leader";
        private static final int CLIENT_QTY = 10;
    
        public static void main(String[] args) throws Exception {
            List<CuratorFramework> clients = Lists.newArrayList();
            List<LeaderLatch> examples = Lists.newArrayList();
            String connectionInfo = "127.0.0.1:2181";
            try {
                //1、先创建10个leaderLatch
                for (int i = 0; i < CLIENT_QTY; i++) {
                    CuratorFramework client
                            = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                    clients.add(client);
                    LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                    latch.addListener(new LeaderLatchListener() {
                        @Override
                        public void isLeader() {
                            // TODO Auto-generated method stub
                            System.out.println("I am Leader");
                        }
    
                        @Override
                        public void notLeader() {
                            // TODO Auto-generated method stub
                            System.out.println("I am not Leader");
                        }
                    });
                    examples.add(latch);
                    //2、启动后,选中一个作为leader
                    client.start();
                    latch.start();
                }
                Thread.sleep(10000);
                LeaderLatch currentLeader = null;
                //3、通过hasLeadership查看自己是否是leader, 如果是的话返回true。
                for (LeaderLatch latch : examples) {
                    if (latch.hasLeadership()) {
                        currentLeader = latch;
                    }
                }
                System.out.println("current leader is " + currentLeader.getId());
                System.out.println("release the leader " + currentLeader.getId());
                //4、close释放当前的领导权。
                currentLeader.close();
    
                Thread.sleep(5000);
    
                for (LeaderLatch latch : examples) {
                    if (latch.hasLeadership()) {
                        currentLeader = latch;
                    }
                }
                System.out.println("current leader is " + currentLeader.getId());
                System.out.println("release the leader " + currentLeader.getId());
            } finally {
                for (LeaderLatch latch : examples) {
                    if (null != latch.getState() && !latch.getState().equals(LeaderLatch.State.CLOSED))
                        latch.close();
                }
                for (CuratorFramework client : clients) {
                    client.close();
                }
            }
        }
    }
    

    (2)LeaderSelector

    • LeaderSelector使用的时候主要涉及下面几个类:

    LeaderSelector
    LeaderSelectorListener
    LeaderSelectorListenerAdapter
    CancelLeadershipException

    • 类似LeaderLatch,LeaderSelector必须start: leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。而takeLeadership()方法只有领导权被释放时才返回。 当你不再使用LeaderSelector实例时,应该调用它的close方法。
    public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
        private final String name;
        private final LeaderSelector leaderSelector;
        //使用AtomicInteger来记录此client获得领导权的次数, 它是”fair”, 每个client有平等的机会获得领导权。
        private final AtomicInteger leaderCount = new AtomicInteger();
    
        public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
            this.name = name;
            leaderSelector = new LeaderSelector(client, path, this);
            //调用 leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。
            leaderSelector.autoRequeue();
        }
    
        public void start() throws IOException {
            leaderSelector.start();
        }
    
        @Override
        public void close() throws IOException {
            leaderSelector.close();
        }
    
        /**
         * leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。
         * 而takeLeadership()方法只有领导权被释放时才返回。
         * 你可以在takeLeadership进行任务的分配等等,并且不要返回,
         * 如果你想要要此实例一直是leader的话可以加一个死循环。
         * @param client
         * @throws Exception
         */
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            final int waitSeconds = (int) (5 * Math.random()) + 1;
            System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
            System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            } catch (InterruptedException e) {
                System.err.println(name + " was interrupted.");
                Thread.currentThread().interrupt();
            } finally {
                System.out.println(name + " relinquishing leadership.\n");
            }
        }
    }
    public class LeaderSelectorDemo {
        protected static String PATH = "/francis/leader";
        private static final int CLIENT_QTY = 10;
    
    
        public static void main(String[] args) throws Exception {
            List<CuratorFramework> clients = Lists.newArrayList();
            List<LeaderSelectorAdapter> examples = Lists.newArrayList();
            String connectionInfo = "127.0.0.1:2181";
            try {
                //1、构建10个LeaderSelectorAdapter
                for (int i = 0; i < CLIENT_QTY; i++) {
                    CuratorFramework client
                            = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                    clients.add(client);
                    LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                    examples.add(selectorAdapter);
                    //2、启动,并开始选举
                    client.start();
                    selectorAdapter.start();
                }
                System.out.println("Press enter/return to quit\n");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
            } finally {
                System.out.println("Shutting down...");
                for (LeaderSelectorAdapter exampleClient : examples) {
                    CloseableUtils.closeQuietly(exampleClient);
                }
                for (CuratorFramework client : clients) {
                    CloseableUtils.closeQuietly(client);
                }
            }
        }
    }
    

    (3)小结
    对比可知,LeaderLatch必须调用close()方法才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。从而,LeaderSelector具有更好的灵活性和可控性,建议有LeaderElection应用场景下优先使用LeaderSelector。

    3、分布式锁
    • 要点:

    1.推荐使用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再拥有锁
    2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
    (1)可重入共享锁Shared Reentrant Lock

    • Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似,即可重入, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:
    public InterProcessMutex(CuratorFramework client, String path)
    
    • 通过acquire()获得锁,并提供超时机制;并通过release()释放锁
    • 请求撤销当前的锁, 调用attemptRevoke()方法,注意锁释放时RevocationListener将会回调。
    public class FakeLimitedResource {
        private final AtomicBoolean inUse = new AtomicBoolean(false);
    
        public void use() throws InterruptedException {
            // 真实环境中我们会在这里访问/维护一个共享的资源
            //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
            //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
            if (!inUse.compareAndSet(false, true)) {
                throw new IllegalStateException("Needs to be used by one client at a time");
            }
            try {
                Thread.sleep((long) (3 * Math.random()));
            } finally {
                inUse.set(false);
            }
        }
    }
    public class InterProcessMutexDemo {
        private InterProcessMutex lock;
        private final FakeLimitedResource resource;
        private final String clientName;
    
        public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
            this.resource = resource;
            this.clientName = clientName;
            this.lock = new InterProcessMutex(client, lockPath);
        }
    
        public void doWork(long time, TimeUnit unit) throws Exception {
            //加锁、其他被阻塞
            if (!lock.acquire(time, unit)) {
                throw new IllegalStateException(clientName + " could not acquire the lock");
            }
            try {
                System.out.println(clientName + " get the lock");
                resource.use(); //access resource exclusively
            } finally {
                System.out.println(clientName + " releasing the lock");
                //解锁
                lock.release(); // always release the lock in a finally block
            }
        }
    
        private static final int QTY = 5;
        private static final int REPETITIONS = QTY * 10;
        private static final String PATH = "/examples/locks";
    
        public static void main(String[] args) throws Exception {
            final FakeLimitedResource resource = new FakeLimitedResource();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            String connectionInfo = "127.0.0.1:2181";
            try {
                for (int i = 0; i < QTY; ++i) {
                    final int index = i;
                    Callable<Void> task = new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(1000, 3));
                            try {
                                client.start();
                                final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                                for (int j = 0; j < REPETITIONS; ++j) {
                                    example.doWork(10, TimeUnit.SECONDS);
                                }
                            } catch (Throwable e) {
                                e.printStackTrace();
                            } finally {
                                CloseableUtils.closeQuietly(client);
                            }
                            return null;
                        }
                    };
                    service.submit(task);
                }
                service.shutdown();
                service.awaitTermination(10, TimeUnit.MINUTES);
            } finally {
    
            }
        }
    }
    

    (2)不可重入共享锁—Shared Lock

    • 这个锁和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入,具体实现类为InterProcessSemaphoreMutex

    (3)可重入读写锁—Shared Reentrant Read Write Lock

    • 类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 --->请求读锁--->释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。可重入读写锁主要由两个类实现:InterProcessReadWriteLock、InterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex。
      (4)信号量—Shared Semaphore
    • 一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Curator中称之为租约(Lease)。 有两种方式可以决定semaphore的最大租约数。第一种方式是用户给定path并且指定最大LeaseSize。第二种方式用户给定path并且使用SharedCountReader类。如果不使用SharedCountReader, 必须保证所有实例在多进程中使用相同的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,但是在B进程中持有的最大租约数量为20,此时租约的意义就失效了。
    • 这次调用acquire()会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。
    • 注意你可以一次性请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
      (5)多共享锁对象 —Multi Shared Lock
    • Multi Shared Lock是一个锁的容器。 当调用acquire(), 所有的锁都会被acquire(),如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。
    4、分布式计数器
    • 计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

    (1)分布式int计数器—SharedCount

    • 这个类使用int类型来计数。 主要涉及三个类。SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

    SharedCount
    SharedCountReader
    SharedCountListener

    • 注意计数器必须start,使用完之后必须调用close关闭它。
      (2)分布式long计数器—DistributedAtomicLong
    • 再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。此计数器有一系列的操作:

    get(): 获取当前值
    increment(): 加一
    decrement(): 减一
    add(): 增加特定的值
    subtract(): 减去特定的值
    trySet(): 尝试设置计数值
    forceSet(): 强制设置计数值

    5、分布式队列
    • 基本不会使用zk作为分布式队列使用
    • Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTS_EQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用Leader选举只让Leader作为唯一的消费者。
    6、分布式屏障—Barrier
    • 分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行,先关实现类DistributedBarrier

    转载:https://my.oschina.net/woter/blog/1933298

    相关文章

      网友评论

          本文标题:3、zk客户端curator使用(转)

          本文链接:https://www.haomeiwen.com/subject/uempnqtx.html