版权声明:本文为CSDN博主「hosaos」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/hosaos/article/details/88727817
记得当年去某大型互联网公司面试的时候,面试官问过我一个问题:你们公司的分布式调度系统是怎么设计的?
之前公司的分布式调度系统里是一个单点的Master负责获取数据库里的待执行任务Task,然后将Task分配给不同的机器去执行,当时就傻乎乎的认为这就是分布式,不是多台机器并行执行任务么,难倒不是分布式?!结果被人家面试官怼了一通,然后问了我一个让我至今印象深刻的问题:你这单点的Master也叫分布式?
哈哈,现在想想当时自己也是傻的可以,Master挂了怎么办?整个系统就玩不转了
分布式调度任务系统里,从可靠性角度出发,Master集群也是必不可少的。但往往,为了保证任务不会重复分配,分配任务的节点只能有一个,这种情况就需要从Master集群中选出一个Leader(老大)去任务池里取任务,本文就会介绍Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector的使用及原理分析
LeaderLatch
基本原理
选择一个根路径,例如"/leader_select",多个机器同时向该根路径下创建临时顺序节点,如"/leader_latch/node_3","/leader_latch/node_1","/leader_latch/node_2",节点编号最小(这里为node_1)的zk客户端成为leader,没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢主
关键API与方法
1. LeaderLatch
org.apache.curator.framework.recipes.leader.LeaderLatch
关键方法如下
//调用start方法开始抢主
void start()
//调用close方法释放leader权限
void close()
//await方法阻塞线程,尝试获取leader权限,但不一定成功,超时失败
boolean await(long, java.util.concurrent.TimeUnit)
//判断是否拥有leader权限
boolean hasLeadership()
2. LeaderLatchListener
org.apache.curator.framework.recipes.leader.LeaderLatchListener
LeaderLatchListener是LeaderLatch客户端节点成为Leader后的回调方法,有isLeader(),notLeader()两个方法
//抢主成功时触发
void isLeader()
//抢主失败时触发
void notLeader()
用法
先看下LeaderLatch构造方法
public LeaderLatch(CuratorFramework client, String latchPath)
{
this(client, latchPath, "", CloseMode.SILENT);
}
public LeaderLatch(CuratorFramework client, String latchPath, String id)
{
this(client, latchPath, id, CloseMode.SILENT);
}
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
{
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.latchPath = PathUtils.validatePath(latchPath);
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
参数说明如下
参数 | 说明 |
---|---|
client | zk客户端实例 |
leaderPath | Leader选举根节点路径 |
id | 客户端id,用来标记客户端,即客户端编号、名称 |
CloseMode | Latch关闭策略,SILENT-关闭时不触发监听器回调,NOTIFY_LEADER-关闭时触发监听器回调方法,默认不触发 |
如果想添加监听器,可以调用addListener()方法
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
}
@Override
public void notLeader() {
}
});
附上我的一段测试代码,模拟了10个客户端抢主的情况,客户端成为leader后手动调用close()释放leader权限并退出leader争夺
public class LeaderLatchTest {
static int CLINET_COUNT = 10;
static String LOCK_PATH = "/leader_latch";
public static void main(String[] args) throws Exception {
List<CuratorFramework> clientsList = Lists.newArrayListWithCapacity(CLINET_COUNT);
List<LeaderLatch> leaderLatchList = Lists.newArrayListWithCapacity(CLINET_COUNT);
//创建10个zk客户端模拟leader选举
for (int i = 0; i < CLINET_COUNT; i++) {
CuratorFramework client = getZkClient();
clientsList.add(client);
LeaderLatch leaderLatch = new LeaderLatch(client, LOCK_PATH, "CLIENT_" + i);
leaderLatchList.add(leaderLatch);
//必须调用start()方法来进行抢主
leaderLatch.start();
}
//判断当前leader是哪个客户端
checkLeader(leaderLatchList);
}
private static void checkLeader(List<LeaderLatch> leaderLatchList) throws Exception {
//Leader选举需要时间 等待10秒
Thread.sleep(10000);
for (int i = 0; i < leaderLatchList.size(); i++) {
LeaderLatch leaderLatch = leaderLatchList.get(i);
//通过hasLeadership()方法判断当前节点是否是leader
if (leaderLatch.hasLeadership()) {
System.out.println("当前leader:"+leaderLatch.getId());
//释放leader权限 重新进行抢主
leaderLatch.close();
checkLeader(leaderLatchList);
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
需要手动调用close()方法来释放leader权限
命令行中会依次输出10个节点成为Leader的信息,如果我们去zk服务器上看指定的路径(latchPath)下的内容,信息如下,每个节点后面都跟了个顺序编号,这就是每个节点抢主时在latchPath路径下产生的临时节点,格式都为 xxxxxx-latch-n,n为临时顺序节点编号
image.png
源码分析
还是从start()方法入手
public void start() throws Exception
{
//通过AtomicReference原子操作 判断是否已经启动过
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
//在与zk服务器建立连接后 调用internalStart()方法初始化
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
internalStart()如下,注意到加了synchronized关键字
private synchronized void internalStart()
{
if ( state.get() == State.STARTED )
{
//为zk添加连接监听器,连接器监听到重连时间后也会调用reset()方法
client.getConnectionStateListenable().addListener(listener);
try
{
//初始化事件
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
reset方法如下
void reset() throws Exception
{
//设置当前没成为到leader
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
//为latchPath下每个children设置监听事件
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
//在latchPath下创建临时有序节点,节点内容为serverId,并设置异步回调 client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
创建完临时有序节点后,会触发到回调BackgroundCallback里的getChildren()方法,代码如下
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
checkLeadership(event.getChildren());
}
}
};
//获取latchPath下子节点信息,获取成功后触发异步回调callback
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
最终在获取到latchPath下子节点信息后,进入checkLeadership()方法,该方法是核心,大家睁大眼睛了
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
//按节点编号排序
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 )
{
//如果当前节点编号最小 即抢主成功 设当前节点为leader
setLeadership(true);
}
else
{
//抢主失败 监听前面一个(节点编号更小的)节点
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
//监听前一个节点的删除事件,重新进入getChildren方法判断是否抢主成功
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch ( Exception ex )
{
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// previous node is gone - reset
reset();
}
}
};
//设置对前一个节点删除时间的监听器,并在异步回调里重新进行抢主
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
核心流程就是
- zk客户端往同一路径下创建临时节点,创建后回调callBack
- 在回调事件中判断自身节点是否是节点编号最小的一个
- 如果是,则抢主成功,如果不是,设置对前一个节点(编号更小的)的删除事件的监听器,删除事件触发后重新进行抢主
网友评论