简介
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud以私有云平台的方式提供集资源、调度以及分片为一体的全量级解决方案,依赖Mesos和Zookeeper。
概述
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。提供个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
架构图
Elastic-job架构图Elastic-job-lite
Elastic-job-lite 是一种轻量级的分布式任务调度的解决方案,基于Quartz实现任务的调度和基于zookeeper来协调多个作业服务器。
Elastic-job-lite 分片的时机
首先需要时候分片通过调用setReshardingFlag() 函数来实现,
在zk创建持久节点 /{jobname}/leader/sharding/necessary
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
以下情况会触发分片
1.ReconcileService#runOneIteration()
ReconcileServicd继承com.google.common.util.concurrent.AbstractScheduledService
来实现定时任务,每隔1分钟运行一次
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
runOneIteration
@Override
protected void runOneIteration() {
//加载job的配置
JobConfiguration config = configService.load(true);
//默认的协调间隔是10分钟
int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
// 成立条件:协调时间间隔大于0 && 系统当前时间 - 上一次协调时间 >= 10min
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
//成立条件:自己是leader && zk下没有 /${namespace}/${jobname}/leader/sharding/necessary这个节点 && servers节点下有下线的机器
if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
//创建重新分片
shardingService.setReshardingFlag();
}
}
}
2.作业服务器启动,job初始化是想zk注册job信息的时候,创建需要分片的节点
SetUpFacade#registerStartUpInfo()
public void registerStartUpInfo(final boolean enabled) {
// 注册监听器
listenerManager.startAllListeners();
//选主
leaderService.electLeader();
//在servers节点
serverService.persistOnline(enabled);
//在instance节点
instanceService.persistOnline();
//在需要分片节点
shardingService.setReshardingFlag();
//启动reconcileService定时任务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
3.监听节点,当servers节点变化时候触发重新分片
ShardingListenerManager.ListenServersChangedJobListener
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
//成立条件job存在 && (是instance节点产生的变化 或者是server节点产生的变化)
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
4.监听节点,当config节点变化时候触发重新分片
ShardingTotalCountChangedJobListener
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 判断条件:变化节点是config && 当前分片总数不为0
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
//获取sharding count
int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
//如果获取的sharding count 和 本地的sharding count不同 设置重新分片
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
网友评论