美文网首页
剖析azkaban解析作业流

剖析azkaban解析作业流

作者: wangqiaoshi | 来源:发表于2017-01-22 11:20 被阅读0次

    azkaban是没有创建作业流,创建任务,建立任务之间关系的操作的.
    它本身的工作模式,在本地进行编辑.创建任务,创建作业流,建立依赖关系,然后打成zip上传.

    DirectoryFlowLoader解压zip包,然后解析解压出来的zip目录.
    首先需要两个基础类Node,Edge.
    Node表示的是作业流中的节点信息,包含job文件名,properties属性文件名,作业类型,
    Edge表示的是节点之间边信息.

    loadProjectFlow

    将project目录解析成flow,properties json串,将一些无效的job,flow预先剔除掉.

    无效的job:

    • type属性没写的
    • 重复job(多次出现将会被剔除掉)

    无效依赖:

    • 只身依赖(自己依赖自己)
    • 无效依赖(所依赖的job不存在)
    • 依赖于job是个重复job

    代码

    public void loadProjectFlow(Project project, File baseDirectory) {
        propsList = new ArrayList<Props>();//整个工程的.properties配置列表
        flowPropsList = new ArrayList<FlowProps>();//作业流.properties配置列表
        jobPropsMap = new HashMap<String, Props>();//jobName->.job配置
        nodeMap = new HashMap<String, Node>();//jobName->Node
        flowMap = new HashMap<String, Flow>();//flowName->Flow
        errors = new HashSet<String>();
        duplicateJobs = new HashSet<String>();//重复任务名(jobName)
        nodeDependencies = new HashMap<String, Map<String, Edge>>();//jobname->sourceJobName->依赖的边
        rootNodes = new HashSet<String>();//根节点,解释一下,这里的根节点是flow中最后的节点
        flowDependencies = new HashMap<String, Set<String>>();//flow于flow之间的依赖关系,解决内嵌之间依赖关系
    
        // Load all the props files and create the Node objects
        loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
    
        jobPropertiesCheck(project);
    
        // Create edges and find missing dependencies
        resolveDependencies();
    
        // Create the flows.
        buildFlowsFromDependencies();
    
        // Resolve embedded flows
        resolveEmbeddedFlows();
    
      }
    

    loadProjectFlow函数用于解析工作目录,解析job,构建工作流.

    第一步loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
    目的有两个,一将配置文件解析成类,二排除文件重复job
    加载.properties,.job文件,将加载的配置放入
    flowPropsList,propsList.(.properties)
    jobPropsMap,duplicateJobs,nodeMap.(
    .job)
    nodeMap存储的是所有的节点信息

    jobPropertiesCheck函数,检查job任务属性是否合格.azkaban中会限制每个任务最大内存和最小内存.如果超过job的社会超过限制,就会放入error中.

      private void jobPropertiesCheck(Project project) {
        // if project is in the memory check whitelist, then we don't need to check
        // its memory settings
        if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
            ProjectWhitelist.WhitelistType.MemoryCheck)) {
          return;
        }
    
        String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
        String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
        long sizeMaxXms = Utils.parseMemString(maxXms);
        long sizeMaxXmx = Utils.parseMemString(maxXmx);
    
        for (String jobName : jobPropsMap.keySet()) {
    
          Props jobProps = jobPropsMap.get(jobName);
          String xms = jobProps.getString(XMS, null);
          if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
              && Utils.parseMemString(xms) > sizeMaxXms) {
            errors.add(String.format(
                "%s: Xms value has exceeded the allowed limit (max Xms = %s)",
                jobName, maxXms));
          }
          String xmx = jobProps.getString(XMX, null);
          if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
              && Utils.parseMemString(xmx) > sizeMaxXmx) {
            errors.add(String.format(
                "%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
                jobName, maxXmx));
          }
    
          // job callback properties check
          JobCallbackValidator.validate(jobName, props, jobProps, errors);
        }
      }
    

    resolveDependencies回溯依赖关系,得到所有Edge,将无效依赖排除(依赖的job不存在,依赖的job是重复job)

    buildFlowsFromDependencies利用之前解析好Nodes,Edges,回溯形成flow.

    //这里所谓的根节点是末节点
    private void buildFlowsFromDependencies() {
        //找出所有的依赖节点
        // Find all root nodes by finding ones without dependents.
        HashSet<String> nonRootNodes = new HashSet<String>();
        for (Map<String, Edge> edges : nodeDependencies.values()) {
          for (String sourceId : edges.keySet()) {
            nonRootNodes.add(sourceId);
          }
        }
    
        // Now create flows. Bad flows are marked invalid
        Set<String> visitedNodes = new HashSet<String>();
        for (Node base : nodeMap.values()) {
          // Root nodes can be discovered when parsing jobs
          if (rootNodes.contains(base.getId())
              || !nonRootNodes.contains(base.getId())) {
            rootNodes.add(base.getId());
            Flow flow = new Flow(base.getId());
            Props jobProp = jobPropsMap.get(base.getId());
    
            // Dedup with sets
            @SuppressWarnings("unchecked")
            List<String> successEmailList =
                jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
                    Collections.EMPTY_LIST);
            Set<String> successEmail = new HashSet<String>();
            for (String email : successEmailList) {
              successEmail.add(email.toLowerCase());
            }
    
            @SuppressWarnings("unchecked")
            List<String> failureEmailList =
                jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
                    Collections.EMPTY_LIST);
            Set<String> failureEmail = new HashSet<String>();
            for (String email : failureEmailList) {
              failureEmail.add(email.toLowerCase());
            }
    
            @SuppressWarnings("unchecked")
            List<String> notifyEmailList =
                jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
                    Collections.EMPTY_LIST);
            for (String email : notifyEmailList) {
              email = email.toLowerCase();
              successEmail.add(email);
              failureEmail.add(email);
            }
    
            flow.addFailureEmails(failureEmail);
            flow.addSuccessEmails(successEmail);
    
            flow.addAllFlowProperties(flowPropsList);
            constructFlow(flow, base, visitedNodes);//不断的递归,直到依赖为null为止
            flow.initialize();
            flowMap.put(base.getId(), flow);
          }
        }
      }
    

    相关文章

      网友评论

          本文标题:剖析azkaban解析作业流

          本文链接:https://www.haomeiwen.com/subject/firbbttx.html