美文网首页
schedule调度节点任务执行源码分析

schedule调度节点任务执行源码分析

作者: 想起个帅气的头像 | 来源:发表于2017-11-21 18:26 被阅读0次

调度节点启动说明

public class ScheduleExecute {
   /**
     * 调度节点启动
     * @param rootPath  根路径
     * @param schedulePath  调度路径
     * @param taskPath  任务路径
     */
    public static void startup(String rootPath, String schedulePath, String taskPath) {
        try {
            ScheduleWatcher watcher = new ScheduleWatcher(schedulePath, taskPath);
            ZookeeperScheduleUtil.init(watcher);
            ZooKeeper zk = ZookeeperScheduleUtil.getZookeeper();
            if(zk.exists(rootPath, watcher) == null) {
                zk.create(rootPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if(zk.exists(schedulePath, watcher) == null) {
                zk.create(schedulePath, ByteUtil.objectToByte(new ArrayList<NewData>()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                //开启对schedulePath的监听
                zk.exists(schedulePath, watcher);
            }
            zk.create(schedulePath+"/schedule_node", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            zk.getChildren(taskPath, watcher);
            //数据补偿处理
            CompensationHandler handler = new CompensationHandler(zk, watcher);
            handler.dataCompensation();
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

调度节点启动时,需要做以下几个事情

  1. 创建zk的连接、定义监听watcher
  2. 加入对schedulePath节点的监听事件
  3. 加入对taskPath节点的子节点监听
  4. 创建临时调度节点

代码里有个CompensationHandler类,在这个类里启动了一个调度线程,用于做数据不一致的补偿操作,后续会对这个类做详细说明。

调度节点业务处理

  1. 负责数据的分配(数据变化)

  2. 监听任务子节点的变化(节点变化)

   /**
     * 该方法是一个阻塞方法
     */
    public void process(WatchedEvent event) {
        System.out.println("ScheduleWatcher event " + event.getPath() + " " + event.getType() + " " + event.getState());
        if(event.getType() == EventType.NodeDataChanged) {
            if(event.getPath().equals(schedulePath)) {
                //处理调度节点下的数据变化事件
                pool.execute(new ScheduleDataChangeService(ZookeeperScheduleUtil.getZookeeper(), this, schedulePath, taskPath));
            
            }
        }
        if(event.getType() == EventType.NodeChildrenChanged) {
            if(event.getPath().equals(taskPath)) {
                //处理任务子节点发生变化的事件
                pool.execute(new ScheduleTaskNodeChangeService(ZookeeperScheduleUtil.getZookeeper(), this, schedulePath, taskPath));
            
            }
        }
    }

首先分析下数据如何分配

zk的watcher事件监听是一个一次性行为,那么就意味着每次收到监听事件之后,都要再次发起对该事件(如:调用exist方法啊,这里可查看别的文章看方法对应事件),确保还可以再次获得相同事件。

对于一个调用节点来说,正常环境下肯定会有多个节点同时服务,那么在分配数据的时候,应该交由哪个节点来做呢?

有两种方案可以实现:

方案1. (当前方案)多节点平级,以资源抢占的方式来做数据分配。

解释一下,zk下更新数据是具备乐观锁的,多个服务同时更新一份数据,只有一个服务可以更新成功。

这里也是以这个特性来进行控制的。我们在NewData对象下有一个状态,用于判断这份数据是否已经分配。虽然多个调度节点可能同时收到通知,但只有一个节点会率先更新这个状态,那么这里就以“谁更新,谁分配”的策略来处理。

public class NewData implements Serializable{
    private long taskId;        //本次任务的编号id,用于关联执行taskData的taskid
    private String dataIds;     //要执行的所有id范围  1-6000000
    private int status;         //标志,用于判断新数据有没有分配过 0没有, 1有
    private int version;        //版本,因为未来错误的任务id同样会放到这个newData里,所以这里设置版本来记录id被执行了多少次,当处理N次还不成功的时候,可以做一些其他的处理
}
    /**
     *  更新还未分配的新数据,如果更新成功,则分配的工作就由这个watcher来处理
     *  如果分配失败,出现版本错误,说明已有其他的watcher处理过,本watcher重新获取新数据,处理其他的新数据
     */
    private void changeNewDataStatus(NewData newData, List<NewData> newDataList, Stat stat) throws KeeperException, InterruptedException {
        newData.setStatus(Constant.STATUS_NEWDATA_PUBLISH);
        try {
            stat = zk.setData(schedulePath, ByteUtil.objectToByte(newDataList), stat.getVersion());
            if(stat != null) {
                //如果更新成功了,则开始分配数据
                publishNewData(newData);
            }
        } catch (KeeperException.BadVersionException e) {
            // 如果发生版本错误的异常
            System.out.println("出现版本异常" + e.getMessage());
            //发生一次版本错误,就说明数据肯定是旧的,后面的更新肯定失败,就需要再重新取一次数据
            getNewDataForUpdateForPublishStatus();
        }
    }
方案2. 既然是只能有一个节点来做,那我们就指定一个节点来做好了,怎么指定,就以主从的方式来指定。

那么怎样认定主从呢?zk在创建节点的时候是可以以自增id的方式分配节点名称的,那我们就可以认定id小的节点是主节点,每次收到监听事件时,都拿各自的节点名称去比看自己是不是最小的那个,是就处理,不是就丢弃。

优缺点比较

下面说下这两个方案的优缺点,他们的优缺点刚好是相对的

方案1的缺点在于每次触发NodeDataChanged,所有节点都要执行n次,且多为无意义的执行,举个例子,三个服务节点每次来一个新数据,都要执行一次,失败的节点还要递归第二次,修改成功后还会全部触发一次,这样看来真正的数据处理1次,其他的全是无意义的执行。

方案2的好处可以极大的减小执行次数,除了主节点会执行2次(状态修改前一次,修改后一次),其他节点只是会比较节点name。

方案2的缺点在于当主节点的业务执行一半突然down机,那这次处理就停止了,出现业务执行不完整的问题。

而方案1刚好可以解决,每个节点拿到监听事件之后,不去管其他节点的执行情况,只要能更新就更新,所以即使有down机问题,也会由其他的节点来处理完成。

因此,我们可以把方案1和方案2结合作为方案3,也就是实现一个多主多从的方式,既可以节省一些无意义的代码执行,也可以解决单个主引起的单点问题。如果调度节点比较多,可以以这个方案来实现。

看这里大家可能会产生一些各种情况下down机的疑问,关于这个问题放到最后一篇来说明。

最后是数据分配的代码, 主要分下面两步,比较简单,就不多说了
  1. 先确认当前有多少个可以处理任务的节点
  2. 把要处理的数据基本平均分配到所有节点上
/**
     * 开始分配数据
     * 1. 先确认当前有多少个可以处理任务的节点
     * 2. 把要处理的数据基本平均分配到所有节点上
     * @param newData
     * @throws InterruptedException 
     * @throws KeeperException 
     */
    private void publishNewData(NewData newData) throws KeeperException, InterruptedException {
        List<TaskData> pubTaskDatas = new ArrayList<TaskData>();   //存放分配好的任务数据
        List<String> nodes = zk.getChildren(taskPath, watcher);
        int nodeSize = nodes.size();
        List<String> list = NumberGroupUtil.groupNumber(newData.getDataIds(), nodeSize);
        if(list != null && list.size() > 0) {
            //这组数据需要统一id,以供后续查找
            for (int i=0;i<list.size();i++) {
                TaskData taskData = new TaskData();
                taskData.setId(nodes.get(i)+"_"+System.currentTimeMillis());
                taskData.setTaskId(newData.getTaskId());
                taskData.setNodeName(nodes.get(i));
                taskData.setDataIds(list.get(i));
                taskData.setStatus(Constant.STATUS_TASKDATA_NOSTART);
                taskData.setCreatetime(new Date());
                taskData.setVersion(newData.getVersion());
                pubTaskDatas.add(taskData);
            }
        }
        System.out.println("分配好的task数据:"+pubTaskDatas);
        addTasks(pubTaskDatas);
    }

    /**
     * 把分配好的任务加入到taskPath下
     * @param pubTaskDatas
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void addTasks(List<TaskData> pubTaskDatas) throws KeeperException, InterruptedException {
        try {
            Stat stat = zk.exists(taskPath, watcher);
            List<TaskData> taskDatas = (List<TaskData>) ByteUtil.byteToObject(zk.getData(taskPath, watcher, null));
            if(taskDatas == null) {
                taskDatas = new ArrayList<TaskData>();
            }
            taskDatas.addAll(pubTaskDatas);
            zk.setData(taskPath, ByteUtil.objectToByte(taskDatas), stat.getVersion());
        } catch (KeeperException.BadVersionException e) {
            // 如果发生版本错误的异常
            System.out.println("出现版本异常" + e.getMessage());
            addTasks(pubTaskDatas);
        }
    }
/**
 * 任务数据对象
 * @author 常博
 *
 */
public class TaskData implements Serializable{
    private String id;          //这个任务的独有id,可以唯一确定这个任务的id,默认处理节点的name+时间戳
    private long taskId;         //对应NewData总任务的编号 默认取时间戳
    private String nodeName;  //节点名称
    private int status;      //0 未处理   1正在处理  2处理完成
    private String dataIds; //要处理的数据范围 1-1500000
    private Date createtime;    // 创建时间
    private int version;    //版本,同newData分配过来的版本
    ......
}

数据分配部分的分析到此为止,下面说明第二个主业务:

监听任务节点的变化

任务节点的变化无外乎就是新增和删除,正常情况下的新增和删除都不会有问题,因为都是临时节点,zk的节点结构也会同时发生变化。
难点在于调度节点刚好给一个任务节点分配了任务或者任务执行了一部分后down了。

这个时候就需要

  1. 找出给down机的节点分配的任务;
  2. 知道成功处理了哪些,还没处理哪些;
  3. 把没完成的重新分配。
1. 如何找出down机节点的任务

在找任务之前要先知道是哪个节点down了,watcher事件本身不会告诉我们哪个节点down了,只会告诉我们有变化,这时就需要自行比较找出来。
在之前分配数据的时候,我们已经在TaskData中记录了节点的nodename,因此我们可以比较taskPath下的所有TaskData对象看看哪个对象的nodename已经不存在了,不存在的就是我们要找的TaskData。
在比较的时候我们要同时依赖schedulePath下的NewData列表对象,因为任务节点在处理任务成功后,会先更新NewData列表,后更新TaskData列表(具体请参考另一篇文章), 可能出现NewData列表不存在但是TaskData列表存在的情况(这个特殊情况会由CompensationHandler类来扫描处理),因此我们找出的TaskData的taskId在NewData中也存在。

    /**
     * 获取所有错误节点的任务
     * @param children 当前存在的节点nodename
     * @param newDataList
     * @param taskDataList
     * @return
     */
    private List<TaskData> findErrorNodeDataForNewData(List<String> children, List<NewData> newDataList, List<TaskData> taskDataList) {
        List<TaskData> inexistTastDataList = new ArrayList<TaskData>();
        for (NewData newData : newDataList) {
            //先根据newData找对应的taskData列表
            List<TaskData> subTaskDatas = getTaskDatasForNewData(newData, taskDataList);
            //在根据subTaskDatas找出不存在的节点数据
            List<TaskData> _inexist = compareNodesForErrorTaskData(subTaskDatas, children);
            if(_inexist != null && _inexist.size() > 0) {
                inexistTastDataList.addAll(_inexist);
            }
        }
        return inexistTastDataList;
    }

    /**
     * 获取newData分配的所有子任务
     * @param newData
     * @param taskDataList
     * @return
     */
    private List<TaskData> getTaskDatasForNewData(NewData newData, List<TaskData> taskDataList) {
        List<TaskData> subTaskData = new ArrayList<TaskData>();
        for (TaskData taskData : taskDataList) {
            if(taskData.getTaskId() == newData.getTaskId()) {
                subTaskData.add(taskData);
            }
        }
        return subTaskData;
    }

    /**
     * 比较两个节点列表,返回taskData里已经不存在的节点数据
     * @param subTaskDatas 分配的节点任务
     * @param children  当前存在的节点
     */
    private List<TaskData> compareNodesForErrorTaskData(List<TaskData> subTaskDatas, List<String> children) {
        List<TaskData> inexistTastData = new ArrayList<TaskData>();
        for (TaskData subTaskData : subTaskDatas) {
            if(!children.contains(subTaskData.getNodeName())) {
                inexistTastData.add(subTaskData);
            }
        }
        return inexistTastData;
    }
2. 处理包含错误节点的数据
/**
     * 处理包含不存在节点的数据
     * 1. 从这些节点中,找出已成功完成了的数据
     * 2. 更新newData列表里的数据(把成功了的数据从列表里去掉)
     * 3. 更新taskData列表的数据(把错误节点对象去掉,把剩余的节点数据分配到列表里)
     * 2要在3之前来执行
     * 这里面有很重要的一点,如果2执行成功了,3失败了,这里会有其他的schedule节点来继续执行
     * 因为多个schedule会同时收到节点变化的通知,他们同时去处理这个业务,只有一个节点能处理成功,
     * 当其他节点在第二次尝试更新的时候,如果2成功了,再次执行也无影响;3如果之前失败,则成功执行3;3如果之前成功,则已经找不到失败的节点,方法自然完成
     * @param pendingTaskDataList 包含错误节点的任务列表
     * @throws Exception 
     */
    private void processPendingTaskDataList(List<TaskData> pendingTaskDataList, List<NewData> newDataList, List<TaskData> taskDataList, Stat newDataStat, Stat taskDataStat) throws Exception {
        //从newData列表中去掉所有已经成功了的数据
        processNewDataList(pendingTaskDataList, newDataList, newDataStat);
        //从taskData列表中去掉所有pending节点数据, 并把剩余的dataIds重新分配
        processTaskDataList(pendingTaskDataList, taskDataList, taskDataStat);
    }

/**
     * 遍历待处理的任务数据,从newData列表中去掉所有已经成功了的数据
     * @param pendingTaskDataList
     * @param newDataList
     * @param newDataStat
     * @throws Exception
     */
    private void processNewDataList(List<TaskData> pendingTaskDataList, List<NewData> newDataList, Stat newDataStat) throws Exception {
        try {
            for (TaskData pendingTaskData : pendingTaskDataList) {
                long taskId = pendingTaskData.getTaskId();
                String successIds = memcacheComp.get(pendingTaskData.getId()+"_"+taskId+"_success");
                List<NewData> subNewDataList = getNewDataListByTaskId(taskId, newDataList);
                for (NewData subNewData : subNewDataList) {
                    //把newData里的已经成功完成了的数据,从总数据中移除(这里只删除成功的,未执行的不动)
                    String unfinishNewDataIds = NumberSubstringUtil.substringNumber(subNewData.getDataIds(), successIds);
                    subNewData.setDataIds(unfinishNewDataIds);
                }
            }
            zk.setData(schedulePath, ByteUtil.objectToByte(newDataList), newDataStat.getVersion());
            System.out.println("处理错误节点数据之后的newDataList: " + newDataList);
        } catch (KeeperException.BadVersionException e) {
            //这里要注意: 当发生版本更新异常时,要对整个业务做重新获取和比较,因为这里面涉及到的所有任务有可能都产生了变化
            //当执行N次之后,节点变化的问题有可能让本schedule处理,也有可能让别的schedule节点处理,会自然结束。
            processTaskNodeChildrenChanged();
        }
    }

/**
     * 从taskData列表中去掉所有pending节点数据, 并把剩余的dataIds重新分配
     * @param pendingTaskDataList
     * @param taskDataList
     * @param taskDataStat
     * @throws Exception
     */
    private void processTaskDataList(List<TaskData> pendingTaskDataList, List<TaskData> taskDataList, Stat taskDataStat) throws Exception {
        try {
            for (TaskData pendingTaskData : pendingTaskDataList) {
                //这里要一个个的更新,因为不同的pendingTaskData可能属于不同taskId,不能合并
                long taskId = pendingTaskData.getTaskId();
                String successIds = memcacheComp.get(pendingTaskData.getId()+"_"+taskId+"_success");
                //把taskDataList里的pending对象移除(这里做整体移除,剩余的id会新建任务对象放进去)
                String surplusIds = NumberSubstringUtil.substringNumber(pendingTaskData.getDataIds(), successIds);
                boolean removeResult = taskDataList.remove(pendingTaskData);
                System.out.println("removeTaskNodeData: " + removeResult);
                List<TaskData> surplusTaskDataList = publishSurplusIdsAgain(surplusIds, pendingTaskData); 
                taskDataList.addAll(surplusTaskDataList);
            }
            zk.setData(taskPath, ByteUtil.objectToByte(taskDataList), taskDataStat.getVersion());
            System.out.println("处理错误节点之后的taskDataList: " + taskDataList);
        } catch (KeeperException.BadVersionException e) {
            //这里要注意: 当发生版本更新异常时,要对整个业务做重新获取和比较,因为这里面涉及到的所有任务有可能都产生了变化
            //当执行N次之后,节点变化的问题有可能让本schedule处理,也有可能让别的schedule节点处理,会自然结束。
            processTaskNodeChildrenChanged();
        }
    }

从代码中可以看出,处理错误节点的数据主要分为了两块内容

  1. 把NewData列表中的成功的部分删除掉,而执行成功的部分在task节点执行的时候就放到的memcache中。所以这里从memcache里取出来就OK了。至于为什么用到了memcache,task任务介绍篇的末尾已经做了说明。
  2. 从每个TaskData中找出剩余的id,把这些剩余的id按照之前的分配规则生成新的TaskData列表,把这个列表加到原TaskData列表中,并把这个含错误的TaskData从原TaskData列表中移除
  3. 把这个新处理过的TaskData列表set到zk的taskPath节点中,来触发任务节点的NodeDataChanged
    /**
     * 执行任务
     * @param taskData 任务对象
     * @param progress 记录成功和失败的id
     * @param id 任务数据的id
     * @throws Exception
     */
    private void doAction(TaskData taskData, TaskProgress progress, int id) throws Exception {
        try {
            //如果错误id的执行次数超过了100次,就默认为执行成功,是为了防止无限次错误调用
            if(taskData.getVersion() <= 100) {
                doAction(id, taskData.getVersion());
            }
            // 线程处理时,每处理完一个数据,就要把id加到完成队列(字符串)里--memcache
            progress.setSuccessIds(NumberConcatUtil.concatNumber(progress.getSuccessIds(), id));
            memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_success", progress.getSuccessIds());
        } catch (TaskHandleException e) {
            // 3. 处理过程中如果出错,则把错误的id加到errorIds中
            progress.setErrorIds(NumberConcatUtil.concatNumber(progress.getErrorIds(), id));
            memcacheComp.set(taskData.getId() + "_" + taskData.getTaskId() + "_error", progress.getErrorIds());
        }
    }

相关文章

网友评论

      本文标题:schedule调度节点任务执行源码分析

      本文链接:https://www.haomeiwen.com/subject/mzvivxtx.html