美文网首页
温故知新:初识DataX

温故知新:初识DataX

作者: 灿烂的GL | 来源:发表于2024-08-15 10:26 被阅读0次

本文为学习笔记,会随着学习深入持续更新,仅供参考

一、datax干嘛的和之前的seatunnel使用上有什么区别

DataX 是阿里巴巴开发的一个开源数据同步工具。
DataX :更加专注于批量数据的同步和迁移,适用于数据仓库建设、数据备份和数据迁移等场景。主要支持批处理模式,通过配置 JSON 文件来定义数据同步任务。不依赖于其他大数据计算框架。
SeaTunnel :侧重于实时数据处理和批处理任务,适用于实时 ETL、数据流处理和数据分析等场景。支持流处理和批处理两种模式,更适合实时数据处理任务。基于 Apache Spark 和 Apache Flink 构建,具有分布式计算的优势和扩展性。


二、dataX实现数据全量同步举例

1. 安装 DataX

首先,需要安装 DataX。可以从 GitHub 上下载 DataX 源码或预编译的二进制包:

git clone https://github.com/alibaba/DataX.git

进入 DataX 目录并编译:

cd DataX
mvn clean package assembly:assembly -Dmaven.test.skip=true

2. 配置 JSON 文件

DataX 使用 JSON 文件来配置数据同步任务。以 MySQL 到 Hadoop 的数据同步为例,首先需要创建一个 JSON 配置文件,例如 mysql_to_hadoop.json
以下是一个简单的配置示例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3  // 并行度,可以根据需要调整
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "your_mysql_username",
            "password": "your_mysql_password",
            "connection": [
              {
                "table": ["your_table_name"],
                "jdbcUrl": ["jdbc:mysql://your_mysql_host:3306/your_database"]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://your_hadoop_cluster",
            "fileType": "text",
            "path": "/user/hadoop/your_hdfs_path/",
            "fileName": "your_file_name",
            "column": [
              {"name": "column1", "type": "string"},
              {"name": "column2", "type": "int"},
              {"name": "column3", "type": "date"}
            ],
            "writeMode":"truncate",
            "fieldDelimiter": "\t"
          }
        }
      }
    ]
  }
}

3. 配置项说明

  • reader 部分:配置 MySQL 数据源的连接信息,包括用户名、密码、表名和 JDBC URL。
  • writer 部分:配置 Hadoop HDFS 的目标路径、文件类型(如 text、orc、parquet 等)、文件名、字段信息及写入模式。
    setting:任务的全局设置。
    在 DataX 的 hdfswriter 配置中,writeMode 参数用于指定写入数据到目标 HDFS 时的模式
    append:追加模式。如果目标路径下已经存在文件,将在文件末尾追加新数据,而不会覆盖原有数据。
    nonConflict:非冲突模式。如果目标路径下已经存在文件,任务会报错并终止,以避免覆盖现有文件。
    truncate:截断模式。如果目标路径下已经存在文件,将删除现有文件,然后写入新数据
    hadoopConfig:传递 Hadoop 相关配置项,如果配置了 dfs.replication 为 1,表示每个文件块只保留一个副本。生产环境可以相应提高。
    参数详细说明

4. 执行同步任务

在命令行中执行以下命令运行 DataX 任务:

python {DATAX_HOME}/bin/datax.py /path/to/your/mysql_to_hadoop.json

5. 检查同步结果

同步完成后,可以通过 HDFS 命令行或其他工具检查数据是否成功写入到指定的 HDFS 路径。

hdfs dfs -ls /user/hadoop/your_hdfs_path/
hdfs dfs -cat /user/hadoop/your_hdfs_path/your_file_name

三、DataX实现数据增量同步举例

a. 时间戳字段或自增字段:通过数据表中的时间戳字段(如 last_update_time)或自增字段(如 id),在每次同步时记录最后同步的位置,下次同步时只同步新增加或更新的数据。

b. 支持的插件:一些 DataX 插件支持增量同步,如 MySQL Reader 和 Writer 插件,通过配置 SQL 查询条件来实现增量同步。

示例配置

还是以mysql到hdfs为例。

1. 配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "${username}",
            "password": "${password}",
            "column": [
              "id", "name", "update_time"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "${table}"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://${host}:${port}/${database}"
                ]
              }
            ],
            "where": "update_time > '${last_update_time}'"
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://${hdfs_host}:${hdfs_port}",
            "fileType": "text",
            "path": "/path/to/hdfs/${table}",
            "fileName": "${table}",
            "column": [
              {"name": "id", "type": "bigint"},
              {"name": "name", "type": "string"},
              {"name": "update_time", "type": "timestamp"}
            ],
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "gzip",
            "hadoopConfig": {
              "dfs.replication": "1"
            }
          }
        }
      }
    ]
  }
}

2. 使用脚本自动更新时间戳

为了实现真正的增量同步,需要在每次同步完成后更新 last_update_time。可以通过脚本来实现:

#!/bin/bash

# MySQL 配置信息
MYSQL_HOST="your_mysql_host"
MYSQL_PORT="your_mysql_port"
MYSQL_DATABASE="your_database"
MYSQL_USERNAME="your_username"
MYSQL_PASSWORD="your_password"
# HDFS 配置信息
HDFS_HOST="your_hdfs_host"
HDFS_PORT="your_hdfs_port"
# 定义变量
LAST_UPDATE_FILE="last_update_time.txt"
JOB_TEMPLATE="job_template.json"
TABLES=("table1" "table2" "table3")  # 需要同步的表名列表
# 读取最后一次同步的时间戳
if [ ! -f "$LAST_UPDATE_FILE" ]; then
  echo "1970-01-01 00:00:00" > $LAST_UPDATE_FILE
fi
LAST_UPDATE_TIME=$(cat $LAST_UPDATE_FILE)

# 遍历所有表并生成配置文件
for TABLE in "${TABLES[@]}"; do
  JOB_FILE="job_${TABLE}.json"
  sed -e "s/\${username}/${MYSQL_USERNAME}/g" \
      -e "s/\${password}/${MYSQL_PASSWORD}/g" \
      -e "s/\${host}/${MYSQL_HOST}/g" \
      -e "s/\${port}/${MYSQL_PORT}/g" \
      -e "s/\${database}/${MYSQL_DATABASE}/g" \
      -e "s/\${table}/${TABLE}/g" \
      -e "s/\${last_update_time}/${LAST_UPDATE_TIME}/g" \
      -e "s/\${hdfs_host}/${HDFS_HOST}/g" \
      -e "s/\${hdfs_port}/${HDFS_PORT}/g" \
      $JOB_TEMPLATE > $JOB_FILE
  # 执行 DataX 同步任务
  python /path/to/datax/bin/datax.py $JOB_FILE

  # 检查是否成功
  if [ $? -ne 0 ]; then
    echo "DataX job for table ${TABLE} failed!"
    exit 1
  fi
done
# 更新 last_update_time
NEW_LAST_UPDATE_TIME=$(date '+%Y-%m-%d %H:%M:%S')
echo $NEW_LAST_UPDATE_TIME > $LAST_UPDATE_FILE
echo "DataX incremental sync completed successfully."

四、其他问题

1、如果想设置mysql和hdfs中的字段不一致怎么设置?
  • reader

    • column:定义从 MySQL 读取的列名。这些列名需要与数据库中的实际列名一致。
  • writer

    • column:定义写入 HDFS 的列名和数据类型。这里可以与 reader 中的列名不同,实现字段名的映射和数据类型的转换。
注意事项
  1. 字段顺序:确保 readerwriter 中的列顺序一致,以保证数据正确映射。例如,reader 读取的第一个列 id 将映射到 writer 的第一个列 user_id
  2. 数据类型转换:DataX 在处理数据时会尝试进行类型转换,但为了避免数据丢失或格式错误,尽量确保源数据类型和目标数据类型兼容。例如,将 MySQL 的 DECIMAL 转换为 HDFS 的 DOUBLE
  3. 数据验证:在数据同步过程中,可以通过设置 errorLimit 来控制允许的错误数量和比例,以便及时发现和处理数据转换中的问题。

参考链接

1、官方文档

相关文章

网友评论

      本文标题:温故知新:初识DataX

      本文链接:https://www.haomeiwen.com/subject/hghekjtx.html