执行节点启动说明
/**
* 任务节点启动
* @param rootPath 根路径
* @param taskPath 任务路径
* @param taskPath 调度路径
* @param taskService 业务service
*/
public static void startup(String rootPath, String taskPath, String schedulePath, TaskService taskService) {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
TaskWatcher watcher = new TaskWatcher(countDownLatch, taskService, taskPath, schedulePath);
ZookeeperTaskUtil.init(watcher);
ZooKeeper zk = ZookeeperTaskUtil.getZookeeper();
if(zk.exists(rootPath, watcher) == null) {
zk.create(rootPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if(zk.exists(taskPath , watcher) == null) {
zk.create(taskPath, ByteUtil.objectToByte(new ArrayList<TaskData>()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.exists(taskPath, watcher);
}
String nodeName = zk.create(taskPath+"/task_node", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(nodeName);
watcher.setNodeName(nodeName.substring(nodeName.lastIndexOf("/")+1, nodeName.length()));
countDownLatch.countDown();
while(true) {
Thread.sleep(Integer.MAX_VALUE);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("任务节点启动失败");
} finally {
}
}
执行节点启动时,需要做以下几个事情
- 创建zk的连接、定义监听watcher
- 加入对taskPath节点的子节点监听
- 创建临时调度节点
执行节点业务处理
执行节点的业务比较简单,就做一件事情,处理分配来的数据
/**
* process是阻塞方法
* 这里要注意,业务处理是开辟线程处理的,所以List<TaskData>里的任务会并发执行,这里要注意同时放到List<TaskData>的任务不能有执行顺序的要求
* 未来如果支持了任务的优先级,那么同一优先级的任务也不能有执行顺序的要求
*/
public void process(WatchedEvent event) {
System.out.println("TaskWatcher event " + event.getPath() + " " + event.getType() + " " + event.getState());
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(event.getType() == EventType.NodeDataChanged) {
if (event.getPath().equals(taskPath)) {
if(taskService != null) {
taskService.initProperties(ZookeeperTaskUtil.getZookeeper(), this, nodeName, taskPath, schedulePath);
}
pool.execute(taskService);
}
}
}
首先需要找出自己节点名下且还未处理过的TaskData,先更新状态为处理中,如果更新成功,就开始对数据进行处理。
/**
* 1. 更新这个taskData的状态
* 2. 执行每一个数据id,并记录每一条id的成功或失败结果
* 3. 处理完成之后,把这个taskData的数据从任务节点和调度节点的数据列表中删除
*/
public void findTaskDataToProcess() {
try {
// 先取版本,后取数据
Stat stat = zk.exists(taskPath, watcher);
List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
if (taskDatas != null && taskDatas.size() > 0) {
for (TaskData taskData : taskDatas) {
// 如果需要处理的节点是本节点且这段数据还处于未处理的状态
if (nodeName.equals(taskData.getNodeName()) && taskData.getStatus() == Constant.STATUS_TASKDATA_NOSTART) {
// 更新这个taskData的状态
changeDoingStatusForTaskData(taskDatas, taskData, stat);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 注意:这里更新时候可能会出现版本不一致的情况
private void changeDoingStatusForTaskData(List<TaskData> taskDatas, TaskData taskData, Stat stat) throws Exception {
try {
taskData.setStatus(Constant.STATUS_TASKDATA_DOING);
stat = zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
// taskData被更新成功,可以对其进行业务处理
if (stat != null) {
handleTaskData(taskData);
}
} catch (KeeperException.BadVersionException e) {
// 如果发生版本错误的异常
System.out.println("出现版本异常" + e.getMessage());
getNewDataForUpdateForDoingStatus();
}
}
业务处理主要分为两步
- 调用业务具体实现doAction(),并记录成功和失败的数据
- 将成功的数据在zk里进行删除;将失败的数据再次放回到调度节点中
/**
* 任务处理
*
* @param taskData
* @throws Exception
*/
private void handleTaskData(TaskData taskData) throws Exception {
//获取任务执行结果
TaskProgress progress = getTaskProcess(taskData);
//处理完成之后,更新这个taskData从root的列表中删除
finishTaskData(progress, taskData);
}
/**
* 获取任务的执行结果
*
* @param taskData
* @return
* @throws Exception
*/
private TaskProgress getTaskProcess(TaskData taskData) throws Exception {
TaskProgress progress = new TaskProgress();
String[] strArr = taskData.getDataIds().split(",");
for (String data : strArr) {
if (data != null) {
if (data.contains("-")) {
int left = Integer.valueOf(data.split("-")[0]);
int right = Integer.valueOf(data.split("-")[1]);
for (int i = left; i <= right; i++) {
//执行任务
doAction(taskData, progress , i);
}
} else {
doAction(taskData, progress, Integer.valueOf(data));
}
}
}
return progress;
}
/**
* 执行任务
*
* @param taskData 任务对象
* @param progress 记录成功和失败的id
* @param id 任务数据的id
* @throws Exception
*/
private void doAction(TaskData taskData, TaskProgress progress, int id) throws Exception {
try {
//如果错误id的执行次数超过了100次,就默认为执行成功,是为了防止无限次错误调用
if(taskData.getVersion() <= 100) {
doAction(id, taskData.getVersion());
}
//每成功处理完一个数据,就要把id加到完成队列(字符串)里--memcache
progress.setSuccessIds(NumberConcatUtil.concatNumber(progress.getSuccessIds(), id));
memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_success", progress.getSuccessIds());
} catch (TaskHandleException e) {
// 注意:这里约束调用方如果业务执行出错,需抛出TaskHandleException
// 处理过程中如果出错,则把错误的id加到errorIds中
progress.setErrorIds(NumberConcatUtil.concatNumber(progress.getErrorIds(), id));
memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_error", progress.getErrorIds());
}
}
这里面的doAction方法交由调用方实现
/**
* 业务处理方法
* @param id 任务数据id
* @param version 该任务id被执行的次数,当id执行错误重新执行一次,这个version就会加1
* 这里要注意,为了防止大量的重复错误调用,业务方需要根据version的值做特殊处理,默认执行100次之后就会认为成功
* @throws TaskHandleException 如果执行时抛出此异常,表示任务该id执行失败,需重新执行
*/
public abstract void doAction(long id, int version) throws TaskHandleException;
前面的过程是业务执行过程,当任务执行顺利完成后,将开始第二步处理。
/**
* 业务执行完成后,对taskData进行后续处理
* 1. 处理newData列表的数据:newData中去掉这个task里的dataids(包含成功的和失败的,因为失败的又补充到后面了),加上执行错误的内容
* 2. 处理taskData列表的数据:第一步成功之后,TaskData可以从列表中直接移除
* @param progress
* @param taskData
*/
private void finishTaskData(TaskProgress progress, TaskData taskData) {
// 封装上次处理错误需要再次处理的NewData对象
NewData errorData = encapsulateNewDataByErrorIds(progress.getErrorIds(), taskData);
try {
//对newDataList列表中的数据进行更新处理
boolean flag = updateNewDataList(taskData, errorData);
if (flag) {
// newDataList更新成功,则再去处理taskDataList
updateTaskDataList(taskData);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
封装errorData就不多说了,主要是更新NewData列表和TaskData列表这两个方法。
/**
* 更新NewData列表
*/
private boolean updateNewDataList(TaskData taskData, NewData errorData) throws KeeperException, InterruptedException {
try {
// task节点不需要监听新数据节点的变化
Stat stat = zk.exists(schedulePath, false);
List<NewData> newDataList = (List<NewData>) ByteUtil.byteToObject(zk.getData(schedulePath, false, null));
Iterator<NewData> it = newDataList.iterator();
while (it.hasNext()) {
NewData newData = it.next();
if (newData.getTaskId() == taskData.getTaskId() && newData.getStatus() == Constant.STATUS_NEWDATA_PUBLISH) {
// 剩余还处于执行状态下的dataIds
String surplusDoingDataIds = NumberSubstringUtil.substringNumber(newData.getDataIds(), taskData.getDataIds());
if (surplusDoingDataIds.equals(newData.getDataIds())) {
// 表示当前这个newData没有要删除的id,不需要任何修改
continue;
}
if (surplusDoingDataIds.length() == 0) {
// 表示所有数据都处理完成, 这里把newData从列表中删除即可
it.remove();
} else {
// 还有剩余的id,需要更新dataIds
newData.setDataIds(surplusDoingDataIds);
}
if (errorData != null) {
// 如果存在上次错误的id,则需要再添加回newDataList里
newDataList.add(errorData);
}
}
}
// 正常来说,一个task执行完,一定会更新newDataList,这里就不在判断是不是有变化,就直接setData了
stat = zk.setData(schedulePath, ByteUtil.objectToByte(newDataList), stat.getVersion());
if (stat != null) {
return true;
}
} catch (KeeperException.BadVersionException e) {
System.out.println("版本异常" + e.getMessage());
// 这里递归的时候之所以又在方法里想重新执行了一遍业务,是因为有可能要处理的这个newData已经被其他任务节点处理过了,即dataIds在这段时间里被新处理过了
return updateNewDataList(taskData, errorData);
}
return false;
}
总而言之也是两步:
把完成了的id从newData列表中删除掉
把errorData添加进去
最后就是更新TaskData列表了,只要把这个对象删了就OK了
/**
* 更新taskData时,要以任务对象的唯一id来确认,不能以总任务id来确认
*
* @param taskData
* @throws InterruptedException
* @throws KeeperException
*/
private void updateTaskDataList(TaskData taskData) throws KeeperException, InterruptedException {
try {
// taskPath这个节点路径需要持续监听
Stat stat = zk.exists(taskPath, watcher);
List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
for (TaskData data : taskDatas) {
if (data.getId().equals(taskData.getId())) {
taskDatas.remove(data);
break;
}
}
zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
} catch (KeeperException.BadVersionException e) {
updateTaskDataList(taskData);
}
}
说明为什么加入了memcache
为了可以支持非幂等性任务,每一次的执行结果必须要记录下来,记录的方式在最开始考虑了几种方案,分别有zk的持久节点,zk的临时节点,缓存,数据库。
先分析zk的两种节点,因为要记录每次的结果,这种频繁更新肯定不适合用一个持久节点来存储,而它本身的临时节点也会因为服务的断开而自行删除。所以zk节点不能用。
而缓存和数据库依赖又比较重,同时还要维护它自身的高可用性,目前没有想到一个比较好的方案,只能从中挑一个基本还能接受的方案,如果有哪位读者有更好的方案,请给我留言。
当然,如果使用场景本身不需要支持非幂等性任务,那么这里就可以不需要任何依赖,直接重新跑一遍就行了。目前还不支持,我会在之后加一个是否需要支持幂等性的开关。
注意: 从代码里可以看出执行成功和缓存保存是分两步完成,不具备事务性,就意味着在极特殊的情况下,缓存里会少保存一个执行成功的id,所以不能保证非幂等性任务的完全准确,在使用时请注意。
任务节点的执行过程说明就这么多了,后面有对调度节点的过程说明。
网友评论