简介
Curator是Netflix公司提供的一个Zookeeper开源客户端。相比官方的Apache zookeeper客户端,curator抽象层次更高,使用更为简便。
Curator recipes为我们实现了分布式系统中常见的,借助zookeeper的功能。例如leader选举,分布式锁,屏障和计数器等。这里介绍下如何使用Curator recipies提供的leader选举功能。
引入依赖
新建maven项目,在pom
文件引入如下依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
示例程序
Demo.java
public static void main(String[] args) throws Exception {
String nodeName = args.length > 0 ? args[0] : "Default";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(2000).retryPolicy(retryPolicy).build();
// 启动curatorFramework
curatorFramework.start();
// 创建LeaderLatch。注意第二个参数latch path
LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/leader_demo");
leaderLatch.start();
// 设置leader状态变更监听器
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println(nodeName + " is leader");
}
@Override
public void notLeader() {
System.out.println(nodeName + " is not Leader");
}
});
// 休息一段时间再调用hasLeadership方法,等待zookeeper通信完毕
Thread.sleep(500);
// 获取当前leader状态,是否是leader
String hasLeadershipHint = leaderLatch.hasLeadership() ? "Has leadership" : "Do not have leadership";
System.out.println(hasLeadershipHint);
// 如果不是leader方法会阻塞,直到自己成为leader才会返回
leaderLatch.await();
System.out.println(nodeName + " is working as leader");
Thread.sleep(10000);
// 使用完毕之后要关闭leaderLatch
leaderLatch.close();
curatorFramework.close();
}
创建LeaderLatch需要两个参数:curator框架的client和latch path。其中latch path 参数至关重要。要求参与leader选举的所有节点必须使用相同的latch path。从另一个角度理解就是说,Curator保证,使用相同latch path的节点,同一时间只会有一个获取到leader角色。
使用LeaderLatch之前,必须调用它的start
方法,开始选举过程。
LeaderLatch的addListener
方法为LeaderLatch增加一个监听器。该监听器会在leader状态发生变化之时(即hasLeadership
)调用。
hasLeadership
方法返回是否为leader角色。
await
方法调用的时候,如果目前为leader角色,方法会立即返回并继续向下运行。如果当前不是leader角色,该方法会阻塞,直到被中断,leaderlatch被关闭或者获取到leader角色。
await
方法还有一个重载版本await(long timeout, TimeUnit unit)
。这个方法可以指定最大等待时间。如果当前不为leader,运行这个方法的时候会阻塞,直到阻塞时间超过最大等待时间。
分别使用参数node1
,和node2
连续运行此段程序两次。我们会发现node1输出为:
node1 is leader
Has leadership
node1 is working as leader
node1 在10秒睡眠期间仍然是leader角色,因此node2的输出会阻塞到:
Do not have leadership
直到node1执行完退出,node2才从await方法返回,继续执行。
网友评论