美文网首页
azkaban:azkaban-common:Project

azkaban:azkaban-common:Project

作者: raincoffee | 来源:发表于2018-01-31 18:32 被阅读731次

    azkaban源码:azkaban-common:Project

    由于种种原因,要基于azkaban二次开发一个适合自身的调度框架。所以进行源码的学习。方便后续开发。

    源码构建

    • build
      • gradlew clean
      • Gradlew build
      • gradlew installDist
    • create db
      • CREATE DATABASE azkaban;
      • run sql script
      • download jdbc jar

    Project

    该模块主要包含了project的一系列操作。

    假设我们现在有一个项目的zip文件。如何解决如下问题:

    • 如何创建一个project?
    • 一个project里面包含flow。这个flow是如何构建的?
    • 一个flow里面应该有多个job或者说node。又是如何构建的?
    • project flow 这些概念持久化存储在哪?
    • 如何查询project flow 信息?
    • 一个project中的flow如何调度?

    当然这里面也包含了user,permission等概念。

    首先 我们来看一下项目创建的流程。这里面只是创建project。还没有上传zip。

    • User 提交数据
    • ProjectManagerServlet 处理请求。
    • 调用ProjectManager创建project。
    • 调用ProjectLorder创建project。
    • 调用ProjectLorder实现类JDBCProjectImpl创建project
    • 调用DataBaseOperator提供的方法插入到数据库。
    版面 11

    接下来 我们来看一下项目上传的流程。==解析flow等信息都是在上传的过程中完成的。==

    • user提交zip
    • ProjectManagerServlet处理请求
    • 调用ProjectManager 上传project
    • 调用AzkabanProjectLoader上传project
      • 解压zip
      • 调用FlowLoaderFactory创建FlowLoader,创建Flow,并将Flow设置为Project里面的属性。
      • 调用StorageManager,内部调用projectloader上传信息到db。
      • 调用projectLoader,执行各种方法,持久化相关信息到DB。
    版面 12

    最后看一下其他查询请求的处理。

    handleGet

    • handleAJAXAction
      • ajaxFetchProjectLogEvents: projectManager.getProjectEventLogs(project, num, skip); 获取项目日志信息
      • ajaxFetchFlow
      • ajaxFetchFlowDetails
      • ajaxFetchFlowGraph
      • ajaxFetchFlowNodeData
      • ajaxFetchProjectFlows
      • ajaxChangeDescription
      • ajaxGetPermissions
      • ajaxGetGroupPermissions
      • ajaxGetProxyUsers
      • ajaxChangePermissions
      • ajaxAddPermission
      • ajaxAddProxyUser
      • ajaxRemoveProxyUser
      • ajaxFetchFlowExecutions
      • ajaxFetchLastSuccessfulFlowExecution
      • ajaxFetchJobInfo
      • ajaxSetJobOverrideProperty
    • handleProjectLogsPage
    • handlePermissionPage
    • handlePropertyPage
    • ...

    基本上类似上述过程。再次不做重复。

    通过上述的表述,我们可以发现。主要的project处理的的逻辑都集中在ProjectManager上。下面我们将详细的学习分析ProjectManager。

    ProjectManager

    project层次上的管理。单例模式。

    ==主要包含各种姿势的project的查询设置创建等。属于对外暴漏提供服务的类==。里面内部实现是由AzkabanProjectLoader,ProjectLoader等类提供服务实现的。

    getProjectNames()
    getUserProjects()
    getGroupProjects()
    getUserProjectsByRegex()
    getProjects()
    getProjectsByRegex()
    isActiveProject()
    getProject()
    createProject()
    purgeProject()
    removeProject()
    updateProjectDescription()
    ...
    

    整个调用的层次结构大概如下:

    • projectManager
      • AzkabanProjectLoader
        • ProjectLoader
        • StorageManager
          • ProjectLoader
          • Storage
        • FlowLoaderFactory
          • FlowLoader
      • ProjectLoader
        • JdbcProjectImpl
        • ...
    版面 8

    AzkabanProjectLoader

    Handles the downloading and uploading of projects.
    

    主要处理project的上传和下载。

    核心方法==uploadProject==过程。

    • 解压缩 file = unzipProject(archive, fileType);
    • 验证有效性 reports = validateProject(project, archive, file, prop);
    • 创建flowloader loader = this.flowLoaderFactory.createFlowLoader(file);
      • ==工厂模式==:根据AZKABAN_FLOW_VERSION创建不同的flowloader,即DirectoryYamlFlowLoader或者DirectoryFlowLoader
    • 上传project到db persistProject(project, loader, archive, file, uploader);
    • 清空目录 FlowLoaderUtils.cleanUpDir(file);
    • 返回reports

    ProjectLoader

    定义了各种接口。其应该有各种实现,默认使用jdbcprojectimpl实现。

    JdbcProjectImpl

    ProjectLoader的一种实现,This class implements ProjectLoader using new azkaban-db code to allow DB failover. 单例模式。

    其主要功能是提供存储project各种信息的持久化方法。以及这新信息获取的方法。

    具体的方法:

    • fetchAllActiveProjects:获取所有active的项目
    • fetchProjectById:通过projectid 获取project
    • fetchProjectByName:通过project name 获取project
    • createNewProject:根据名字 描述等创建新项目.
    • uploadProjectFile: 上传项目文件,这个函数挺有意思的。里面是读这个文件,然后将它分成每10M一个chuck,然后插入到数据库中,并更新相应的表。
    • addProjectVersion: 添加projectversion
    • fetchProjectMetaData:获取上传的项目元数据。返回类型是ProjectFileHandler,具体数据为里面的属性。
    • getUploadedFile:获取上传的file,返回类型是ProjectFileHandler,具体数据为里面的属性。其中查处所有的chuck,然后写入到文件中,再计算生成文件的md5与元数据中的md5是否一致,一直返回文件。不一致抛出异常。
    • changeProjectVersion:修改项目版本。
    • updateProjectSettings:更改项目配置。
    • removeProject:为假删除。SET active=false
    • postEvent
    • getProjectEvents
    • updateDescription
    • getLatestProjectVersion
    • uploadFlows: 该方法主要是上传一个project的flow。首先讲一个flow对象 转换成一个Map<String, Object>,再然后转换成json,再然后转换成byte[] 存入到数据库中。
    • fetchAllProjectFlows:查询一个project的所有flow。
    • uploadProjectProperty:上传project properties。
    • fetchProjectProperty
    • cleanOlderProjectVersion
    • uploadFlowFile 上传flow file 类似于uploadFlows
    • getUploadedFlowFile 获取flow file 类似于fetchAllProjectFlows
    • getLatestFlowVersion
    • isFlowFileUploaded

    StorageManager

    StorageManager manages and coordinates all interactions with the Storage layer. This also includes bookkeeping like updating DB with the new versionm, etc

    azkaban提供了多种存储层的实现方式,包括database,hdfs,localfile等。也可以自定义自己的存储层。默认使用的是database进行存储。

    其中这里面最重要的对外的类应该是==StorageManager==。主要提供了uploadproject和getprojectfile等方法。

    FlowLoaderFactory

    Factory class to generate flow loaders.
    

    flowLoader

    Interface to load project flows.

    包含以下重要接口:

    • ValidationReport loadProjectFlow(final Project project, final File projectDir);
    • Map<String, Flow> getFlowMap();

    更直白一点,就是提供如何从一个project中构建flow的功能。

    根据不同版本,flowloader提供了两种实现方式。

    • DirectoryYamlFlowLoader:flow信息存储在yaml文件中。
    • DirectoryFlowLoader:flow信息以directory的方式存在。
    版面 4

    NodeBeanLoader

    其实该部分 应该是如何构建flow,包括node,什么的。最终对外输出的是==FlowLoaderUtils==。里面包含了yaml方式的构建和一些公用的方法。

    • AzkabanNode
      • A unit of execution that could be either a job or a flow.
    • AzkabanFlow
      • Flow level definition of the DAG.
      • Contains a list of AzkabanNodes and related flow properties.
    • AzkabanJob
      • The smallest individual unit of execution in Azkaban.
      • Contains information about job type and related properties.
    版面 3

    数据结构

    • node的内部属性:
      • level: 距离起始节点的距离(多条路径的话取最大值),起始节点level为0
      • jobSource
      • propsSource
    • Edge的内部属性:
      • id:getSourceId() + ">>" + getTargetId()
      • sourceId
      • source
      • targetId
      • target
      • error
    • flow的内部属性:
      • inEdges:key为jobName,value为节点的入边集合
      • outEdges:key为jobName,value为节点的出边集合
      • startNodes:起始Node集合,没有入边的谓之起始节点
      • endNodes:结束Node结合,没有出边的谓之结束节点
      • 每个flow里能配置报警等信息

    AZ.Storage

    Storage:
    /**
     * The Azkaban Storage interface would facilitate getting and putting objects into a storage
     * mechanism of choice. By default, this is set to the MySQL database. However, users can have the
     * ability to choose between multiple storage types in future.
     *
     * This is different from storing Azkaban state in MySQL which would typically be maintained in a
     * different database.
     *
     * Note: This is a synchronous interface.
     */
    
    版面 7

    Az.DB

    该模块主要是azkaban db相关代码。包含以下功能。

    • 建立jdbc连接,获取datasource
    • 执行sql操作接口。

    如需扩展其他链接,需要继承==AzkabanDataSource==。实现相应功能。另外,对外使用 主要使用AzkabanDataSource 和DatabaseOperator两个类。

    版面 5

    相关文章

      网友评论

          本文标题:azkaban:azkaban-common:Project

          本文链接:https://www.haomeiwen.com/subject/lcplzxtx.html