美文网首页
task任务执行节点源码分析

task任务执行节点源码分析

作者: 想起个帅气的头像 | 来源:发表于2017-11-23 17:00 被阅读0次

执行节点启动说明

/**
     * 任务节点启动
     * @param rootPath  根路径
     * @param taskPath  任务路径
     * @param taskPath  调度路径
     * @param taskService 业务service
     */
    public static void startup(String rootPath, String taskPath, String schedulePath, TaskService taskService) {
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            TaskWatcher watcher = new TaskWatcher(countDownLatch, taskService, taskPath, schedulePath);
            ZookeeperTaskUtil.init(watcher);
            ZooKeeper zk = ZookeeperTaskUtil.getZookeeper();
            if(zk.exists(rootPath, watcher) == null) {
                zk.create(rootPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if(zk.exists(taskPath , watcher) == null) {
                zk.create(taskPath, ByteUtil.objectToByte(new ArrayList<TaskData>()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                zk.exists(taskPath, watcher);
            }
            String nodeName = zk.create(taskPath+"/task_node", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(nodeName);
            watcher.setNodeName(nodeName.substring(nodeName.lastIndexOf("/")+1, nodeName.length()));
            countDownLatch.countDown();
            while(true) {
                Thread.sleep(Integer.MAX_VALUE);
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("任务节点启动失败");
        } finally {
        }
        
    }

执行节点启动时,需要做以下几个事情

  1. 创建zk的连接、定义监听watcher
  2. 加入对taskPath节点的子节点监听
  3. 创建临时调度节点

执行节点业务处理

执行节点的业务比较简单,就做一件事情,处理分配来的数据

/**
     * process是阻塞方法
     * 这里要注意,业务处理是开辟线程处理的,所以List<TaskData>里的任务会并发执行,这里要注意同时放到List<TaskData>的任务不能有执行顺序的要求
     * 未来如果支持了任务的优先级,那么同一优先级的任务也不能有执行顺序的要求
     */
    public void process(WatchedEvent event) {
        System.out.println("TaskWatcher event " + event.getPath() + " " + event.getType() + " " + event.getState());
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(event.getType() == EventType.NodeDataChanged) {
            if (event.getPath().equals(taskPath)) {
                if(taskService != null) {
                    taskService.initProperties(ZookeeperTaskUtil.getZookeeper(), this, nodeName, taskPath, schedulePath);
                }
                pool.execute(taskService);
            }
        }
    }

首先需要找出自己节点名下且还未处理过的TaskData,先更新状态为处理中,如果更新成功,就开始对数据进行处理。

/**
     * 1. 更新这个taskData的状态
     * 2. 执行每一个数据id,并记录每一条id的成功或失败结果
     * 3. 处理完成之后,把这个taskData的数据从任务节点和调度节点的数据列表中删除
     */
    public void findTaskDataToProcess() {
        try {
            // 先取版本,后取数据
            Stat stat = zk.exists(taskPath, watcher);
            List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
            if (taskDatas != null && taskDatas.size() > 0) {
                for (TaskData taskData : taskDatas) {
                    // 如果需要处理的节点是本节点且这段数据还处于未处理的状态
                    if (nodeName.equals(taskData.getNodeName()) && taskData.getStatus() == Constant.STATUS_TASKDATA_NOSTART) {
                        // 更新这个taskData的状态
                        changeDoingStatusForTaskData(taskDatas, taskData, stat);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

// 注意:这里更新时候可能会出现版本不一致的情况
    private void changeDoingStatusForTaskData(List<TaskData> taskDatas, TaskData taskData, Stat stat) throws Exception {
        try {
            taskData.setStatus(Constant.STATUS_TASKDATA_DOING);
            stat = zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
            // taskData被更新成功,可以对其进行业务处理
            if (stat != null) {
                handleTaskData(taskData);
            }
        } catch (KeeperException.BadVersionException e) {
            // 如果发生版本错误的异常
            System.out.println("出现版本异常" + e.getMessage());
            getNewDataForUpdateForDoingStatus();
        }
    }

业务处理主要分为两步

  1. 调用业务具体实现doAction(),并记录成功和失败的数据
  2. 将成功的数据在zk里进行删除;将失败的数据再次放回到调度节点中
/**
     * 任务处理
     * 
     * @param taskData
     * @throws Exception
     */
    private void handleTaskData(TaskData taskData) throws Exception {
        //获取任务执行结果
        TaskProgress progress = getTaskProcess(taskData);
        //处理完成之后,更新这个taskData从root的列表中删除
        finishTaskData(progress, taskData);
    }
/**
     * 获取任务的执行结果
     * 
     * @param taskData
     * @return
     * @throws Exception
     */
    private TaskProgress getTaskProcess(TaskData taskData) throws Exception {
        TaskProgress progress = new TaskProgress();
        String[] strArr = taskData.getDataIds().split(",");
        for (String data : strArr) {
            if (data != null) {
                if (data.contains("-")) {
                    int left = Integer.valueOf(data.split("-")[0]);
                    int right = Integer.valueOf(data.split("-")[1]);
                    for (int i = left; i <= right; i++) {
                        //执行任务
                        doAction(taskData, progress , i);
                    }
                } else {
                    doAction(taskData, progress, Integer.valueOf(data));
                }
            }
        }
        return progress;
    }
/**
     * 执行任务
     * 
     * @param taskData 任务对象
     * @param progress 记录成功和失败的id
     * @param id 任务数据的id
     * @throws Exception
     */
    private void doAction(TaskData taskData, TaskProgress progress, int id) throws Exception {
        try {
            //如果错误id的执行次数超过了100次,就默认为执行成功,是为了防止无限次错误调用
            if(taskData.getVersion() <= 100) {
                doAction(id, taskData.getVersion());
            }
            //每成功处理完一个数据,就要把id加到完成队列(字符串)里--memcache
            progress.setSuccessIds(NumberConcatUtil.concatNumber(progress.getSuccessIds(), id));
            memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_success", progress.getSuccessIds());
        } catch (TaskHandleException e) {
            // 注意:这里约束调用方如果业务执行出错,需抛出TaskHandleException
            // 处理过程中如果出错,则把错误的id加到errorIds中
            progress.setErrorIds(NumberConcatUtil.concatNumber(progress.getErrorIds(), id));
            memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_error", progress.getErrorIds());
        }
    }

这里面的doAction方法交由调用方实现

/**
     * 业务处理方法
     * @param id 任务数据id
     * @param version 该任务id被执行的次数,当id执行错误重新执行一次,这个version就会加1
     * 这里要注意,为了防止大量的重复错误调用,业务方需要根据version的值做特殊处理,默认执行100次之后就会认为成功
     * @throws TaskHandleException 如果执行时抛出此异常,表示任务该id执行失败,需重新执行
     */
    public abstract void doAction(long id, int version) throws TaskHandleException;

前面的过程是业务执行过程,当任务执行顺利完成后,将开始第二步处理。

/**
     * 业务执行完成后,对taskData进行后续处理
     * 1. 处理newData列表的数据:newData中去掉这个task里的dataids(包含成功的和失败的,因为失败的又补充到后面了),加上执行错误的内容
     * 2. 处理taskData列表的数据:第一步成功之后,TaskData可以从列表中直接移除
     * @param progress
     * @param taskData
     */
    private void finishTaskData(TaskProgress progress, TaskData taskData) {
        // 封装上次处理错误需要再次处理的NewData对象
        NewData errorData = encapsulateNewDataByErrorIds(progress.getErrorIds(), taskData);
        try {
            //对newDataList列表中的数据进行更新处理
            boolean flag = updateNewDataList(taskData, errorData);
            if (flag) {
                // newDataList更新成功,则再去处理taskDataList
                updateTaskDataList(taskData);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

封装errorData就不多说了,主要是更新NewData列表和TaskData列表这两个方法。

/**
     * 更新NewData列表
     */
    private boolean updateNewDataList(TaskData taskData, NewData errorData) throws KeeperException, InterruptedException {
        try {
            // task节点不需要监听新数据节点的变化
            Stat stat = zk.exists(schedulePath, false);
            List<NewData> newDataList = (List<NewData>) ByteUtil.byteToObject(zk.getData(schedulePath, false, null));
            Iterator<NewData> it = newDataList.iterator();
            while (it.hasNext()) {
                NewData newData = it.next();
                if (newData.getTaskId() == taskData.getTaskId() && newData.getStatus() == Constant.STATUS_NEWDATA_PUBLISH) {
                    // 剩余还处于执行状态下的dataIds
                    String surplusDoingDataIds = NumberSubstringUtil.substringNumber(newData.getDataIds(), taskData.getDataIds());
                    if (surplusDoingDataIds.equals(newData.getDataIds())) {
                        // 表示当前这个newData没有要删除的id,不需要任何修改
                        continue;
                    }
                    if (surplusDoingDataIds.length() == 0) {
                        // 表示所有数据都处理完成, 这里把newData从列表中删除即可
                        it.remove();
                    } else {
                        // 还有剩余的id,需要更新dataIds
                        newData.setDataIds(surplusDoingDataIds);
                    }
                    if (errorData != null) {
                        // 如果存在上次错误的id,则需要再添加回newDataList里
                        newDataList.add(errorData);
                    }
                }
            }
            // 正常来说,一个task执行完,一定会更新newDataList,这里就不在判断是不是有变化,就直接setData了
            stat = zk.setData(schedulePath, ByteUtil.objectToByte(newDataList), stat.getVersion());
            if (stat != null) {
                return true;
            }
        } catch (KeeperException.BadVersionException e) {
            System.out.println("版本异常" + e.getMessage());
            // 这里递归的时候之所以又在方法里想重新执行了一遍业务,是因为有可能要处理的这个newData已经被其他任务节点处理过了,即dataIds在这段时间里被新处理过了
            return updateNewDataList(taskData, errorData);
        }
        return false;
    }

总而言之也是两步:
把完成了的id从newData列表中删除掉
把errorData添加进去

最后就是更新TaskData列表了,只要把这个对象删了就OK了

/**
     * 更新taskData时,要以任务对象的唯一id来确认,不能以总任务id来确认
     * 
     * @param taskData
     * @throws InterruptedException
     * @throws KeeperException
     */
    private void updateTaskDataList(TaskData taskData) throws KeeperException, InterruptedException {
        try {
            // taskPath这个节点路径需要持续监听
            Stat stat = zk.exists(taskPath, watcher);
            List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
            for (TaskData data : taskDatas) {
                if (data.getId().equals(taskData.getId())) {
                    taskDatas.remove(data);
                    break;
                }
            }
            zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
        } catch (KeeperException.BadVersionException e) {
            updateTaskDataList(taskData);
        }
    }

说明为什么加入了memcache

为了可以支持非幂等性任务,每一次的执行结果必须要记录下来,记录的方式在最开始考虑了几种方案,分别有zk的持久节点,zk的临时节点,缓存,数据库。
先分析zk的两种节点,因为要记录每次的结果,这种频繁更新肯定不适合用一个持久节点来存储,而它本身的临时节点也会因为服务的断开而自行删除。所以zk节点不能用。
而缓存和数据库依赖又比较重,同时还要维护它自身的高可用性,目前没有想到一个比较好的方案,只能从中挑一个基本还能接受的方案,如果有哪位读者有更好的方案,请给我留言。
当然,如果使用场景本身不需要支持非幂等性任务,那么这里就可以不需要任何依赖,直接重新跑一遍就行了。目前还不支持,我会在之后加一个是否需要支持幂等性的开关。

注意: 从代码里可以看出执行成功和缓存保存是分两步完成,不具备事务性,就意味着在极特殊的情况下,缓存里会少保存一个执行成功的id,所以不能保证非幂等性任务的完全准确,在使用时请注意。

任务节点的执行过程说明就这么多了,后面有对调度节点的过程说明。

相关文章

网友评论

      本文标题:task任务执行节点源码分析

      本文链接:https://www.haomeiwen.com/subject/zrpovxtx.html