Curator是zookeeper使用最广泛的工具
(1)zk的应用场景:分布式锁、Master选举
(2)zk的应用场景有网络访问异常等极端情况
Curator的Backoff退避算法
- 每1分钟重试一次,如果网络出现阻塞。
22:25 request1(block)
22:26 request2(毫无意义)
22:27 request3(毫无意义)
22:28 request4(通顺)request2、3、4 - 按照指数间隔重试,比如第一次1分钟,第二次2分钟......随着时间的推移,重试间隔越长。
Curator事件监听
NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener
PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener
TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheCacheListener
pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
创建节点
public class CreateNodeDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-client/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
}
}
image.png
异步创建节点
public class CreateNodeAsyncDemo {
static CountDownLatch cdl = new CountDownLatch(2);
static ExecutorService es = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
String path = "/zk-client";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
cdl.countDown();
}
}, es).forPath(path, "test".getBytes());
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
cdl.countDown();
}
}).forPath(path, "test".getBytes());
cdl.await();
es.shutdown();
}
}
image.png
删除节点
public class DeleteNodeDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-client/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
}
}
Curator事件监听
NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener
public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-client/nodecache";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("10.143.143.185:6181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
NodeCache nc = new NodeCache(client, path, false);
nc.start();
nc.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("update--current data: " + new String(nc.getCurrentData().getData()));
}
});
client.setData().forPath(path, "test123".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(5000);
nc.close();
}
}
image.png
PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener
TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheCacheListener
网友评论