选举使用zk节点jobName/leader/election/latch
,通过使用curator框架完成主节点的选举。选举成功的节点(实例)会创建zk节点jobName/leader/election/instance
,节点值为当前节点实例ID(ip@-@pid)。
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
通过上面代码可知,选举成功的节点会执行回调方法(io.elasticjob.lite.internal.election.LeaderService.LeaderElectionExecutionCallback#execute
):
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
任一作业服务器启动时,都会参与主节点的选举。如果选举成功,则会写入分片处理中标记,然后执行分片。
Leader Latch
In distributed computing, leader election is the process of designating a single process as the organizer of some task distributed among several computers (nodes). Before the task is begun, all network nodes are unaware which node will serve as the "leader," or coordinator, of the task. After a leader election algorithm has been run, however, each node throughout the network recognizes a particular, unique node as the task leader.
LeaderLatches must be started:
leaderLatch.start();
Once started, the LeaderLatch will negotiate with any other LeaderLatch participants that use the same latch path and randomly choose one of them to be the leader. At any time, you can determine if a given instance is the leader by calling:
public boolean hasLeadership()
Return true if leadership is currently held by this instance
Similar to the JDK's CountDownLatch, LeaderLatch has methods that block until leadership is acquired:
public void await()
throws InterruptedException,
EOFException
Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.
When you are through with the LeaderLatch instance, you must call close. This removes the instance from the leader election and releases leadership if the instance has it. Once leadership is released, another participating instance (if any) will be chosen as leader.
leaderLatch.close();
网友评论