美文网首页JAVA藏兵谷
阿里开源数据同步工具——DataX源码揭秘

阿里开源数据同步工具——DataX源码揭秘

作者: 来杯熊酱不加糖 | 来源:发表于2019-09-29 22:25 被阅读0次

    1.前言

    datax是阿里出品,最初是为了解决淘宝数据交换的问题,据说淘宝有30%的数据交换是通过datax完成的。

    2.介绍

    DataX 是一个开源异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

    Data目前已经支持常用的插件体系,主流的RDBMS,NOSQL,大数据计算系统都已接入。

    3.源码解析

    从github上clone源码到本地,源码地址:https://github.com/alibaba/DataX

    DataX源码由Framework(core包,common包和transformer包)及 plugin(ReadPlugin和WritePlugin)组成。

    Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

    3.1入口类Engine

      \bullet entry()方法:

      主要用于获取项目启动参数:job,jobid,mode;

      注意:mode分为单机模式和分布式模式,这里指定为standalone 单机模式。

                jobid默认值为-1,只有在standalone模式下使用,非 standalone 模式必须提供有效的jobid值。

    public static void entry(String jobPath)throws Throwable {

            String jobIdString ="-1";

    // 指定单机还是分布式模式运行

            RUNTIME_MODE ="standalone";

    Configuration configuration = ConfigParser.parse(jobPath);

    ......

    configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

    ......

    ConfigurationValidate.doValidate(configuration);

    Engine engine =new Engine();

    engine.start(configuration);

    }

    \bullet start()方法:

    主要用于初始化配置,检查job的model信息。

    public void start(Configuration allConf) {

    // 绑定column转换信息

        ColumnCast.bind(allConf);

    /**

    * 初始化PluginLoader,可以获取各种插件配置

    */

        LoadUtil.bind(allConf);

    ......

    Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);

    //初始化PerfTrace

        PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);

    perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);

    container.start();

    }

    3.2 jobContainer容器

    job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报。

    \bullet start()方法:

    jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、post以及destroy和statistics。

    public void start() {

    LOG.info("DataX jobContainer starts job.");

    this.preHandle();

    this.init();

    this.prepare();

    this.totalStage =this.split();

    this.schedule();

    this.post();

    this.postHandle();

    this.invokeHooks();

    }

    \bullet init()方法:reader和writer的初始化

    private void init() {

    ......

    JobPluginCollector jobPluginCollector =new DefaultJobPluginCollector(

    this.getContainerCommunicator());

    //必须先Reader ,后Writer

    this.jobReader =this.initJobReader(jobPluginCollector);

    this.jobWriter =this.initJobWriter(jobPluginCollector);

    }

    \bullet schedule()方法:

    任务调度器schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中。

    private void schedule() {

    /**

    * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务

    */

        List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,

    this.needChannelNumber, channelsPerTaskGroup);

    ......

    AbstractScheduler scheduler;

    scheduler = initStandaloneScheduler(this.configuration);

    scheduler.schedule(taskGroupConfigs);

    ......

    /** * 检查任务执行情况 */this.checkLimit();

    this.checkLimit();

    }

    \bullet post()方法: 启动各类数据库插件的读写任务。

    private void post() {

    this.postJobWriter();

    this.postJobReader();

    }

    4. Spring Boot集成DataX

    在springboot项目上,通过POM文件引入datax相关jar包

    <dependency>

    <groupId>com.alibaba.datax</groupId>

    <artifactId>datax-core</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    </dependency>

    同时需要引入数据源读取和写入相关的Reader/Writer插件

    <dependency>

    <groupId>com.oracle</groupId>

    <artifactId>ojdbc6</artifactId>

    <version>11.2.0.3</version>

    <scope>system</scope>

    <systemPath>${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar</systemPath>

    </dependency>

    <dependency>

    <groupId>com.microsoft.sqlserver</groupId>

    <artifactId>sqljdbc4</artifactId>

    <version>4.0</version>

    <scope>system</scope>

    <systemPath>${basedir}/src/main/lib/sqljdbc4-4.0.jar</systemPath>

    </dependency>

    这里引入mysql 及oracle数据源对应的插件

    相关文章

      网友评论

        本文标题:阿里开源数据同步工具——DataX源码揭秘

        本文链接:https://www.haomeiwen.com/subject/cotxyctx.html