美文网首页数据库
数据迁移工具DataX入门

数据迁移工具DataX入门

作者: 文景大大 | 来源:发表于2023-01-13 17:32 被阅读0次

一、DataX是什么

DataX是阿里巴巴开源的离线数据同步工具,实现了包括主流RDBMS数据库、NoSQL、大数据计算系统在内的多种异构数据源之间高效进行数据同步的功能。

二、为什么要使用DataX

DataX设计理念

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

DataX架构设计

DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:

  • Reader,数据采集模块,负责采集数据源中的数据,并将数据发送给FrameWork;
  • Writer,数据写入模块,负责从Framework中取数据,并将数据写入到数据源中;
  • Framework,用于连接Reader和Writer,作为以上两者的数据传输通道,处理缓冲、流量控制、并发、数据转换等核心问题。
DataX工作原理

DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。

  1. 每一个数据同步作业,我们称之为Job,在DataX收到一个Job之后,就启动一个进程来完成整个作业的过程。如果有多个数据同步作业需要同时执行,那就要么排队等待,要么再启动一个DataX进程。Job模块是作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup管理等功能。

  2. Job启动后,会根据不同的源端切分策略,将Job切分为多个小的任务Task,以便于多线程并发执行它们。Task便是DataX中的最小执行单元,每一个Task都负责一部分数据的同步工作。

  3. 切分好Task之后,Job会调用Scheduler模块,根据配置的并发任务数将Task重新组合,组装成TaskGroup,每一个TaskGroup负责以一定的并发度来运行分配好的所有Task,默认情况下的并发度为5。

  4. 每一个Task都有TaskGroup负责启动和控制,Task启动后,会按照上图中介绍的Reader-Channel-Writer来完成其对应的数据同步工作。

  5. Job负责监控并等待多个TaskGroup任务完成后就退出,否则异常退出。

DataX还具有如下的核心优势:

  1. 可靠的数据质量监控,能解决数据传输过程中类型失真问题,提供作业全链路流量和数据量的监控,并提供脏数据探测功能;
  2. 丰富的数据转换功能,作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能,还可以让数据在传输过程中轻松完成数据脱敏、数据补全、数据过滤等数据转换功能,还可以让用户自定义转换函数使用;
  3. 精准的速率控制,提供了通道控制、记录流控制、字节流控制三种流控模式,可以随意控制作业速率,让作业在存储介质可承受的范围内达到最佳的同步速率;
  4. 强劲的同步性能;
  5. 健壮的容错机制,在遇到外部因素干扰(网络、数据源不稳定)情况下,会自动进行线程级别、进程级别、作业级别的局部或者全局重试,保证用户作业的稳定运行;
  6. 极简使用体验,提供Linux和Windows版本,下载即可使用,作业过程中,会打印大量的关键信息,包括传输速度、插件性能、进程的CPU、JVM和GC情况等,任务结束后还会打印该任务的总体运行情况。

和其它大数据ETL工具相比:

功能 DataX Sqoop
运行模式 单进程多线程 MR分布式
MySQL读写 单机压力大,读写粒度容易控制 MR模式重,出错后处理麻烦
Hive读写 单机压力大 很好
文件格式 orc支持 orc不支持,可添加
分布式 不支持,可通过调度框架规避 支持
流控 无,需要定制
统计信息 无,分布式的数据搜集不方便
数据校验 无,分布式的数据搜集不方便
监控 无,需要定制

如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。

三、如何使用DataX

3.1 下载和安装

  • 准备好Linux服务器;
  • 安装Java运行环境;
  • 安装Python运行环境;
  • 下载Datax工具包,解压缩到合适的目录;

进入到datax的bin目录下,运行自带的示例:

python datax.py /root/datax/datax/job/job.json

运行后控制台显示运行成功即表示DataX安装完成。

3.2 Job描述介绍

当你决定使用某个Reader和Writer之后,我们可以通过命令来获取一个模板Json:

# 指定需要的Reader和Writer来获取模板Json
python datax.py -r streamreader -w streamwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

# 指定需要的Reader和Writer的文档给地址
Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

# 说明定制好的Json如何启动
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        # 指定列信息
                        "column": [],
                        # 指定记录条数
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        # 是否将数据打印到控制台
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                # 指定并发度,可以是channel、bytes、records三种类型
                "channel": ""
            }
        }
    }
}

大体分为四个部分:

  • Job描述
  • Reader描述和配置
  • Writer描述和配置
  • Job参数配置

我们复制如上json内容,新建一个stream.json放到/root/zx-test下面,内容设置如下:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"zx-test"
                            },
                            {
                                "type":"string",
                                "value":"999"
                            }
                        ],
                        "sliceRecordCount": "10"
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

然后输入命令执行:

python datax.py /root/zx-test/stream.json

...
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
2023-01-14 11:00:32.389 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[101]ms
2023-01-14 11:00:32.390 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-01-14 11:00:42.283 [job-0] INFO  StandAloneJobContainerCommunicator - Total 10 records, 100 bytes | Speed 10B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-01-14 11:00:42.284 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2023-01-14 11:00:42.284 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...

3.3 mysql-hdfs案例

前提条件:

  1. 准备一个MySQL数据库;
  2. 准备一个健康的HDFS集群;

首先,我们创建一张数据表,并创建一些数据:

CREATE TABLE `zx_user` (
  `user_id` bigint(20) NOT NULL COMMENT '用户ID',
  `user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名',
  `age` int(11) DEFAULT NULL COMMENT '用户年龄',
  `user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
  `create_by` varchar(100) DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `update_by` varchar(100) DEFAULT NULL,
  `update_date` datetime DEFAULT NULL,
  `deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除',
  PRIMARY KEY (`user_id`),
  UNIQUE KEY `zx_user_un` (`user_id`),
  UNIQUE KEY `zx_user_un2` (`user_email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES
     (1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0),
     (2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0),
     (5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0),
     (7,'Anna',27,'test7@baomidou.com','System','2021-06-17 14:36:03','System','2021-06-17 14:54:51',0);

然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter,并根据实际情况进行修改:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "user_id",
                            "user_name",
                            "age",
                            "user_email"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://x.x.x.x:3306/zhangxun"
                                ],
                                "table": [
                                    "zx_user"
                                ]
                            }
                        ],
                        "password": "******",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name":"user_id",
                                "type":"BIGINT"
                            },
                            {
                                "name":"user_name",
                                "type":"STRING"
                            },
                            {
                                "name":"age",
                                "type":"INT"
                            },
                            {
                                "name":"user_email",
                                "type":"STRING"
                            }
                        ],
                        "compress": "NONE",
                        "defaultFS": "hdfs://x.x.x.x:9000",
                        "fieldDelimiter": ",",
                        "fileName": "zx_user",
                        "fileType": "text",
                        "path": "/zx/datax",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。

然后我们执行命令开始Job:

python datax.py /root/zx-test/mysql2hdfs.json
...
2023-01-14 15:03:15.319 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
2023-01-14 15:03:15.422 [job-0] INFO  JobContainer -
任务启动时刻                    : 2023-01-14 15:03:04
任务结束时刻                    : 2023-01-14 15:03:15
任务总计耗时                    :                 11s
任务平均流量                    :               10B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   4
读写失败总数                    :                   0

显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:

mysql2hdfs结果

我们查看其中的内容为:

[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d
1,Jone,18,test1@baomidou.com
2,Jack,20,test2@baomidou.com
5,Anna,27,test5@baomidou.com
7,Anna,27,test7@baomidou.com

3.4 hdfs-mysql案例

在以上3.3案例的基础上,我们首先获取一个案例json,python datax.py -r hdfswriter -w mysqlreader,然后修改成需要的:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": ["*"],
                        "defaultFS": "hdfs://x.x.x.x:9000",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "path": "/zx/datax/zx_user.txt"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "user_id",
                            "user_name",
                            "age",
                            "user_email"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://x.x.x.x:3306/zhangxun",
                                "table": [
                                    "zx_user"
                                ]
                            }
                        ],
                        "password": "******",
                        "preSql": [],
                        "session": [],
                        "username": "root",
                        "writeMode": "insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

然后,我们准备一个zx_user.txt,编辑内容如下:

11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com

如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:

python datax.py /root/zx-test/hdfs2mysql.json
...
2023-01-14 17:22:04.815 [job-0] INFO  JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2023-01-14 17:22:04.815 [job-0] INFO  JobContainer - DataX Reader.Job [hdfsreader] do post work.
2023-01-14 17:22:04.816 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
...
任务启动时刻                    : 2023-01-14 17:21:53
任务结束时刻                    : 2023-01-14 17:22:04
任务总计耗时                    :                 11s
任务平均流量                    :                7B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

Job显示执行成功了,我们查询下数据库表中的数据,发现数据为:

1   Jone    18  test1@baomidou.com                  0
2   Jack    20  test2@baomidou.com                  0
5   Anna    27  test5@baomidou.com                  0
7   Anna    27  test7@baomidou.com  System  2021-06-17 14:36:03 System  2021-06-17 14:54:51 0
11  slide   21  slide@baomidou.com                  0
21  mify    20  mify@baomidou.com                   0
51  kitty   27  kitty@baomidou.com                  0

如此表示导入成功了。

四、总结

DataX由于是阿里巴巴开源的,中文文档比较完善,各种插件的说明教程也很全,基本很容易上手,本文旨在帮助入门,一些高级主题还需要另外实验和学习。

参考文档:

alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。 (github.com)

DataX/introduction.md at master · alibaba/DataX (github.com)

DataX/userGuid.md at master · alibaba/DataX (github.com)

相关文章

网友评论

    本文标题:数据迁移工具DataX入门

    本文链接:https://www.haomeiwen.com/subject/kkwgcdtx.html