分布式批处理平台主要由三部分组成:
![](https://img.haomeiwen.com/i5968502/487d627580850330.png)
1、作业调度管控中心
本部分基于SOFA基础技术平台实现,相关部署操作同SOFA基础技术平台,主要提供作业管理、作业监控、作业调度功能,具体为:
a)作业管理:提供作业、作业流元信息的上传,用于管理作业运行规则、失败处理策略、运行参数的配置,失败作业、失败分片的重做处理等。
b)作业监控:提供作业运行时的实时状态监控、作业运行历史信息、作业节点的资源健康状况等。
c)作业调度:根据配置的作业执行计划进行作业的自动调度,也可以由人工发起的手动进行调度作业。
2、协调注册中心
提供作业元数据的存储、作业节点健康状态的监控、作业运行状态的记录、主节点的选举、作业节点的协调等服务。是一个独立部署的中间件,采用sokeeper实现。关于sokeeper的部署、使用请参考版本发布目录:ftp://******/7_TRAINER/1_sokeeper
3、分布式批处理框架
此部分为批处理开发的核心,提供批处理编程模型、作业的运行机制、作业失败处理、dag的执行等,监听并处理从协调注册中心发来的作业命令(如执行、中断等)。本框架不依赖SOFA平台,可用于采用java1.6或以上版本开发的业务系统。
4、开发示例
/** * */package com.yss.sofa.middleware.wolf.demo.pi;import org.springframework.transaction.annotation.Transactional;import com.yss.sofa.middleware.common.log.Logger;import com.yss.sofa.middleware.common.log.LoggerFactory;import com.yss.sofa.middleware.wolf.api.server.JobContext;import com.yss.sofa.middleware.wolf.api.server.Worker;import com.yss.sofa.middleware.wolf.api.server.WorkerException;/** * pi计算的具体功能业务实现,调用单位为作业分片,即本类的doWork是以作业分片的维度进行调用的; * 注意,需要避免共享变量,以免造成结果的不正确性. * * @author lenglinyong * @version 1.0, 2016年7月3日 * @since 1.0, 2016年7月3日 */public class PIWorker implements Worker{/** * Logger for this class */private static final Logger logger = LoggerFactory.getLogger(PIWorker.class.getName());/** * 计算pi的开始 */public static String START_INDEX_KEY = "START_INDEX";/** * 计算pi的最大次数 */public static String END_INDEX_KEY = "END_INDEX";/** * 分片的最大运算次数 */public static String STEP_MAX_INDEX_KEY = "STEP_MAX_INDEX_KEY";public static long MAXLOOP = 100000000;@Override@Transactionalpublic Double doWork(JobContextcontext) {
logger.debug("doWork(context=" + context + ")"); //$NON-NLS-1$ //$NON-NLS-2$
// 以下表示分片的维度、分片的参数;其值的设置是通过PIPartitioner类完成的,在这里进行使用
long startIdx = context.getJobPartition().getLongParameter(START_INDEX_KEY);
long stepEndIdx = context.getJobPartition().getLongParameter(STEP_MAX_INDEX_KEY);
long maxLoopCnt = context.getJobPartition().getLongParameter(END_INDEX_KEY);
// 异常检查,作业的相关异常需抛出WorkerException
if (startIdx < 0) {
throw new WorkerException("DEMO-000000U", START_INDEX_KEY + "=" + startIdx);
}
double pi = 0.0;
for (double i = startIdx; i < stepEndIdx; i++) {
// 中断埋点,wolf引擎接收到作业中断命令时,context.isInterrupted()返回true,可退出业务处理
if (context.isInterrupted()) {
logger.info(String.format("当前节点中断PI作业,节点【%s】,当前分片【%s】,当前数据【%s】,正在计算pi值...", context.getRunAtServer(),context.getJobPartition().getName(), i+""));
break;
}
double x = (i + 0.5) * (1.0 / maxLoopCnt);
pi = pi + 4.0 / (1.0 + x * x);
if (((int) i) % 100000 == 1) {
logger.info(String.format("当前节点【%s】,当前分片【%s】,当前数据【%s】,正在计算pi值...", context.getRunAtServer(),context.getJobPartition().getName(), i+""));
}
if (((int) i) % 10000 == 1) {
context.doProcess(((float) i) / (stepEndIdx - startIdx), "正在计算PI值...");
}
}
// 作业的日志信息,作业开始、结束、以及失败的日志wolf框架自动记录,在本代码中无需重复记录。
// 采用context的相关接口记录日志,用于描述业务中间过程的处理情况等。
context.info("startIdx=" + startIdx + ",endIdx=" + stepEndIdx + "计算结果为:" + pi);
return pi;// 返回本分片的运行结果
}
}
网友评论