美文网首页
task任务执行节点源码分析

task任务执行节点源码分析

作者: 想起个帅气的头像 | 来源:发表于2017-11-23 17:00 被阅读0次

    执行节点启动说明

    /**
         * 任务节点启动
         * @param rootPath  根路径
         * @param taskPath  任务路径
         * @param taskPath  调度路径
         * @param taskService 业务service
         */
        public static void startup(String rootPath, String taskPath, String schedulePath, TaskService taskService) {
            try {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                TaskWatcher watcher = new TaskWatcher(countDownLatch, taskService, taskPath, schedulePath);
                ZookeeperTaskUtil.init(watcher);
                ZooKeeper zk = ZookeeperTaskUtil.getZookeeper();
                if(zk.exists(rootPath, watcher) == null) {
                    zk.create(rootPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                if(zk.exists(taskPath , watcher) == null) {
                    zk.create(taskPath, ByteUtil.objectToByte(new ArrayList<TaskData>()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    zk.exists(taskPath, watcher);
                }
                String nodeName = zk.create(taskPath+"/task_node", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(nodeName);
                watcher.setNodeName(nodeName.substring(nodeName.lastIndexOf("/")+1, nodeName.length()));
                countDownLatch.countDown();
                while(true) {
                    Thread.sleep(Integer.MAX_VALUE);
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("任务节点启动失败");
            } finally {
            }
            
        }
    

    执行节点启动时,需要做以下几个事情

    1. 创建zk的连接、定义监听watcher
    2. 加入对taskPath节点的子节点监听
    3. 创建临时调度节点

    执行节点业务处理

    执行节点的业务比较简单,就做一件事情,处理分配来的数据

    /**
         * process是阻塞方法
         * 这里要注意,业务处理是开辟线程处理的,所以List<TaskData>里的任务会并发执行,这里要注意同时放到List<TaskData>的任务不能有执行顺序的要求
         * 未来如果支持了任务的优先级,那么同一优先级的任务也不能有执行顺序的要求
         */
        public void process(WatchedEvent event) {
            System.out.println("TaskWatcher event " + event.getPath() + " " + event.getType() + " " + event.getState());
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(event.getType() == EventType.NodeDataChanged) {
                if (event.getPath().equals(taskPath)) {
                    if(taskService != null) {
                        taskService.initProperties(ZookeeperTaskUtil.getZookeeper(), this, nodeName, taskPath, schedulePath);
                    }
                    pool.execute(taskService);
                }
            }
        }
    

    首先需要找出自己节点名下且还未处理过的TaskData,先更新状态为处理中,如果更新成功,就开始对数据进行处理。

    /**
         * 1. 更新这个taskData的状态
         * 2. 执行每一个数据id,并记录每一条id的成功或失败结果
         * 3. 处理完成之后,把这个taskData的数据从任务节点和调度节点的数据列表中删除
         */
        public void findTaskDataToProcess() {
            try {
                // 先取版本,后取数据
                Stat stat = zk.exists(taskPath, watcher);
                List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
                if (taskDatas != null && taskDatas.size() > 0) {
                    for (TaskData taskData : taskDatas) {
                        // 如果需要处理的节点是本节点且这段数据还处于未处理的状态
                        if (nodeName.equals(taskData.getNodeName()) && taskData.getStatus() == Constant.STATUS_TASKDATA_NOSTART) {
                            // 更新这个taskData的状态
                            changeDoingStatusForTaskData(taskDatas, taskData, stat);
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    // 注意:这里更新时候可能会出现版本不一致的情况
        private void changeDoingStatusForTaskData(List<TaskData> taskDatas, TaskData taskData, Stat stat) throws Exception {
            try {
                taskData.setStatus(Constant.STATUS_TASKDATA_DOING);
                stat = zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
                // taskData被更新成功,可以对其进行业务处理
                if (stat != null) {
                    handleTaskData(taskData);
                }
            } catch (KeeperException.BadVersionException e) {
                // 如果发生版本错误的异常
                System.out.println("出现版本异常" + e.getMessage());
                getNewDataForUpdateForDoingStatus();
            }
        }
    

    业务处理主要分为两步

    1. 调用业务具体实现doAction(),并记录成功和失败的数据
    2. 将成功的数据在zk里进行删除;将失败的数据再次放回到调度节点中
    /**
         * 任务处理
         * 
         * @param taskData
         * @throws Exception
         */
        private void handleTaskData(TaskData taskData) throws Exception {
            //获取任务执行结果
            TaskProgress progress = getTaskProcess(taskData);
            //处理完成之后,更新这个taskData从root的列表中删除
            finishTaskData(progress, taskData);
        }
    /**
         * 获取任务的执行结果
         * 
         * @param taskData
         * @return
         * @throws Exception
         */
        private TaskProgress getTaskProcess(TaskData taskData) throws Exception {
            TaskProgress progress = new TaskProgress();
            String[] strArr = taskData.getDataIds().split(",");
            for (String data : strArr) {
                if (data != null) {
                    if (data.contains("-")) {
                        int left = Integer.valueOf(data.split("-")[0]);
                        int right = Integer.valueOf(data.split("-")[1]);
                        for (int i = left; i <= right; i++) {
                            //执行任务
                            doAction(taskData, progress , i);
                        }
                    } else {
                        doAction(taskData, progress, Integer.valueOf(data));
                    }
                }
            }
            return progress;
        }
    /**
         * 执行任务
         * 
         * @param taskData 任务对象
         * @param progress 记录成功和失败的id
         * @param id 任务数据的id
         * @throws Exception
         */
        private void doAction(TaskData taskData, TaskProgress progress, int id) throws Exception {
            try {
                //如果错误id的执行次数超过了100次,就默认为执行成功,是为了防止无限次错误调用
                if(taskData.getVersion() <= 100) {
                    doAction(id, taskData.getVersion());
                }
                //每成功处理完一个数据,就要把id加到完成队列(字符串)里--memcache
                progress.setSuccessIds(NumberConcatUtil.concatNumber(progress.getSuccessIds(), id));
                memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_success", progress.getSuccessIds());
            } catch (TaskHandleException e) {
                // 注意:这里约束调用方如果业务执行出错,需抛出TaskHandleException
                // 处理过程中如果出错,则把错误的id加到errorIds中
                progress.setErrorIds(NumberConcatUtil.concatNumber(progress.getErrorIds(), id));
                memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_error", progress.getErrorIds());
            }
        }
    

    这里面的doAction方法交由调用方实现

    /**
         * 业务处理方法
         * @param id 任务数据id
         * @param version 该任务id被执行的次数,当id执行错误重新执行一次,这个version就会加1
         * 这里要注意,为了防止大量的重复错误调用,业务方需要根据version的值做特殊处理,默认执行100次之后就会认为成功
         * @throws TaskHandleException 如果执行时抛出此异常,表示任务该id执行失败,需重新执行
         */
        public abstract void doAction(long id, int version) throws TaskHandleException;
    

    前面的过程是业务执行过程,当任务执行顺利完成后,将开始第二步处理。

    /**
         * 业务执行完成后,对taskData进行后续处理
         * 1. 处理newData列表的数据:newData中去掉这个task里的dataids(包含成功的和失败的,因为失败的又补充到后面了),加上执行错误的内容
         * 2. 处理taskData列表的数据:第一步成功之后,TaskData可以从列表中直接移除
         * @param progress
         * @param taskData
         */
        private void finishTaskData(TaskProgress progress, TaskData taskData) {
            // 封装上次处理错误需要再次处理的NewData对象
            NewData errorData = encapsulateNewDataByErrorIds(progress.getErrorIds(), taskData);
            try {
                //对newDataList列表中的数据进行更新处理
                boolean flag = updateNewDataList(taskData, errorData);
                if (flag) {
                    // newDataList更新成功,则再去处理taskDataList
                    updateTaskDataList(taskData);
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    封装errorData就不多说了,主要是更新NewData列表和TaskData列表这两个方法。

    /**
         * 更新NewData列表
         */
        private boolean updateNewDataList(TaskData taskData, NewData errorData) throws KeeperException, InterruptedException {
            try {
                // task节点不需要监听新数据节点的变化
                Stat stat = zk.exists(schedulePath, false);
                List<NewData> newDataList = (List<NewData>) ByteUtil.byteToObject(zk.getData(schedulePath, false, null));
                Iterator<NewData> it = newDataList.iterator();
                while (it.hasNext()) {
                    NewData newData = it.next();
                    if (newData.getTaskId() == taskData.getTaskId() && newData.getStatus() == Constant.STATUS_NEWDATA_PUBLISH) {
                        // 剩余还处于执行状态下的dataIds
                        String surplusDoingDataIds = NumberSubstringUtil.substringNumber(newData.getDataIds(), taskData.getDataIds());
                        if (surplusDoingDataIds.equals(newData.getDataIds())) {
                            // 表示当前这个newData没有要删除的id,不需要任何修改
                            continue;
                        }
                        if (surplusDoingDataIds.length() == 0) {
                            // 表示所有数据都处理完成, 这里把newData从列表中删除即可
                            it.remove();
                        } else {
                            // 还有剩余的id,需要更新dataIds
                            newData.setDataIds(surplusDoingDataIds);
                        }
                        if (errorData != null) {
                            // 如果存在上次错误的id,则需要再添加回newDataList里
                            newDataList.add(errorData);
                        }
                    }
                }
                // 正常来说,一个task执行完,一定会更新newDataList,这里就不在判断是不是有变化,就直接setData了
                stat = zk.setData(schedulePath, ByteUtil.objectToByte(newDataList), stat.getVersion());
                if (stat != null) {
                    return true;
                }
            } catch (KeeperException.BadVersionException e) {
                System.out.println("版本异常" + e.getMessage());
                // 这里递归的时候之所以又在方法里想重新执行了一遍业务,是因为有可能要处理的这个newData已经被其他任务节点处理过了,即dataIds在这段时间里被新处理过了
                return updateNewDataList(taskData, errorData);
            }
            return false;
        }
    

    总而言之也是两步:
    把完成了的id从newData列表中删除掉
    把errorData添加进去

    最后就是更新TaskData列表了,只要把这个对象删了就OK了

    /**
         * 更新taskData时,要以任务对象的唯一id来确认,不能以总任务id来确认
         * 
         * @param taskData
         * @throws InterruptedException
         * @throws KeeperException
         */
        private void updateTaskDataList(TaskData taskData) throws KeeperException, InterruptedException {
            try {
                // taskPath这个节点路径需要持续监听
                Stat stat = zk.exists(taskPath, watcher);
                List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
                for (TaskData data : taskDatas) {
                    if (data.getId().equals(taskData.getId())) {
                        taskDatas.remove(data);
                        break;
                    }
                }
                zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
            } catch (KeeperException.BadVersionException e) {
                updateTaskDataList(taskData);
            }
        }
    

    说明为什么加入了memcache

    为了可以支持非幂等性任务,每一次的执行结果必须要记录下来,记录的方式在最开始考虑了几种方案,分别有zk的持久节点,zk的临时节点,缓存,数据库。
    先分析zk的两种节点,因为要记录每次的结果,这种频繁更新肯定不适合用一个持久节点来存储,而它本身的临时节点也会因为服务的断开而自行删除。所以zk节点不能用。
    而缓存和数据库依赖又比较重,同时还要维护它自身的高可用性,目前没有想到一个比较好的方案,只能从中挑一个基本还能接受的方案,如果有哪位读者有更好的方案,请给我留言。
    当然,如果使用场景本身不需要支持非幂等性任务,那么这里就可以不需要任何依赖,直接重新跑一遍就行了。目前还不支持,我会在之后加一个是否需要支持幂等性的开关。

    注意: 从代码里可以看出执行成功和缓存保存是分两步完成,不具备事务性,就意味着在极特殊的情况下,缓存里会少保存一个执行成功的id,所以不能保证非幂等性任务的完全准确,在使用时请注意。

    任务节点的执行过程说明就这么多了,后面有对调度节点的过程说明。

    相关文章

      网友评论

          本文标题:task任务执行节点源码分析

          本文链接:https://www.haomeiwen.com/subject/zrpovxtx.html