美文网首页
Curator调用实例

Curator调用实例

作者: 庙人 | 来源:发表于2019-08-22 16:45 被阅读0次

Curator是zookeeper使用最广泛的工具
(1)zk的应用场景:分布式锁、Master选举
(2)zk的应用场景有网络访问异常等极端情况
Curator的Backoff退避算法

  1. 每1分钟重试一次,如果网络出现阻塞。
    22:25 request1(block)
    22:26 request2(毫无意义)
    22:27 request3(毫无意义)
    22:28 request4(通顺)request2、3、4
  2. 按照指数间隔重试,比如第一次1分钟,第二次2分钟......随着时间的推移,重试间隔越长。

Curator事件监听
NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener
PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener
TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheCacheListener

pom文件
<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
    </dependencies>
创建节点
public class CreateNodeDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-client/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
    }
}

image.png
异步创建节点
public class CreateNodeAsyncDemo {
    static CountDownLatch cdl = new CountDownLatch(2);
    static ExecutorService es = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        String path = "/zk-client";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                cdl.countDown();
            }
        }, es).forPath(path, "test".getBytes());

        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                cdl.countDown();
            }
        }).forPath(path, "test".getBytes());

        cdl.await();
        es.shutdown();
    }
}
image.png
删除节点
public class DeleteNodeDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-client/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
    }
}
Curator事件监听

NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener

public class NodeCacheDemo {
    public static void main(String[] args) throws Exception {
        String path = "/zk-client/nodecache";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());

        NodeCache nc = new NodeCache(client, path, false);
        nc.start();
        nc.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("update--current data: " + new String(nc.getCurrentData().getData()));
            }
        });
        
        client.setData().forPath(path, "test123".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(5000);
        nc.close();
    }
}
image.png

PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener
TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheCacheListener

相关文章

网友评论

      本文标题:Curator调用实例

      本文链接:https://www.haomeiwen.com/subject/snznsctx.html