美文网首页spark||flink||scala
DataX扩展hudiwriter组件

DataX扩展hudiwriter组件

作者: Coder小咚 | 来源:发表于2022-10-06 17:54 被阅读0次

    前言

    最近公司启动了一个规划2年的项目,是做一个数据平台。主要包括数据同步(实时/离线)、mapping(实时/离线)、数仓(实时/离线)、源数据管理、数据血缘、调度、BI等。架构分层自上而下为上层业务、中台服务、底层提供基础能力。

    项目规划的比较大,万丈高楼平地起,只能从源头开始着手,源头当然是数据接入这块了。数据同步分为实时同步和批量同步,批量同步分为全量、增量和增量更新,目前官网的datax已经支持全量和增量的同步了,但是没有支持增量的更新,所以笔者打算扩展datax,支持hudiwriter来完善datax支持更新的业务场景。

    Datax架构图

    架构图

    datax采用Framework + plugin架构构建。其中Framework处理了缓冲,限流,并发,上下文加载等技术问题,数据源读取和写入抽象成为Reader/Writer接口,如果内置的plugin无法满足我们的场景,开发者可以自己编写plugin定制功能。

    业务上plugin分为reader和writer:

    • reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
    • writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

    功能上plugin分为job和task:

    • Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
    • Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。

    项目结构

    hudiwriter
    • HudiWriter:主流程
    • Key、HudiWriterErrorCode业务逻辑需要的类,非必须
    • package.xml:全局的package,添加插件的打包内容
    • plugin.json:对插件本身的描述,重点是name和class,name表示插件名称,class表示插件的入口类,必须准确无误;
    • plugin_job_template.json:插件的示例配置文件。

    代码

    github : https://github.com/dongpengfei2/DataX/tree/evyd-1.0.0

    HudiWriter.java

    package com.alibaba.datax.plugin.writer.hudiwriter;
    
    import com.alibaba.datax.common.element.Column;
    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordReceiver;
    import com.alibaba.datax.common.spi.Writer;
    import com.alibaba.datax.common.util.Configuration;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.commons.lang3.exception.ExceptionUtils;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.hudi.client.HoodieJavaWriteClient;
    import org.apache.hudi.client.common.HoodieJavaEngineContext;
    import org.apache.hudi.common.fs.FSUtils;
    import org.apache.hudi.common.model.HoodieAvroPayload;
    import org.apache.hudi.common.model.HoodieKey;
    import org.apache.hudi.common.model.HoodieRecord;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.common.table.HoodieTableMetaClient;
    import org.apache.hudi.common.util.Option;
    import org.apache.hudi.config.HoodieCompactionConfig;
    import org.apache.hudi.config.HoodieIndexConfig;
    import org.apache.hudi.config.HoodieWriteConfig;
    import org.apache.hudi.index.HoodieIndex;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.io.IOException;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.stream.Collectors;
    
    import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_ERROR_TABLE;
    import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_PARAM_LOST;
    import static com.alibaba.datax.plugin.writer.hudiwriter.Key.*;
    
    /**
     * Created by david.dong on 22-8-21.
     */
    public class HudiWriter extends Writer {
        public static class Job extends Writer.Job {
    
            private static final Logger LOG = LoggerFactory.getLogger(Job.class);
    
            private Configuration originalConfig;
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
            }
    
            @Override
            public void prepare() {
    
            }
    
            @Override
            public void post() {
    
            }
    
            @Override
            public void destroy() {
    
            }
    
            @Override
            public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> list = new ArrayList<>();
                for (int i = 0; i < mandatoryNumber; i++) {
                    list.add(originalConfig.clone());
                }
                return list;
            }
    
        }
    
        public static class Task extends Writer.Task {
            private static final Logger LOG = LoggerFactory.getLogger(Task.class);
            private String primaryKey;
            private String partitionFields;
            private String writeOption;
            private int batchSize;
            private Configuration sliceConfig;
            private List<Configuration> columnsList;
    
            private List<String> partitionList;
    
            Schema avroSchema;
    
            private HoodieJavaWriteClient<HoodieAvroPayload> client;
    
            @Override
            public void init() {
                //获取与本task相关的配置
                this.sliceConfig = super.getPluginJobConf();
                String tableName = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_NAME, HUDI_ERROR_TABLE);
                String tablePath = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_PATH, HUDI_PARAM_LOST);
                String tableType = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_TYPE, HUDI_PARAM_LOST);
                primaryKey = sliceConfig.getNecessaryValue(Key.HUDI_PRIMARY_KEY, HUDI_PARAM_LOST);
                partitionFields = sliceConfig.getString(Key.HUDI_PARTITION_FIELDS);
                writeOption = sliceConfig.getNecessaryValue(Key.HUDI_WRITE_OPTION, HUDI_PARAM_LOST);
                columnsList = sliceConfig.getListConfiguration(Key.HUDI_COLUMN);
                batchSize = sliceConfig.getInt(HUDI_BATCH_SIZE);
    
                partitionList = StringUtils.isEmpty(partitionFields) ? new ArrayList<>() : Arrays.asList(partitionFields.split(","));
    
                org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
                try {
                    //是否有Kerberos认证
                    Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false);
                    if(haveKerberos){
                        String kerberosKeytabFilePath = sliceConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
                        String kerberosPrincipal = sliceConfig.getString(Key.KERBEROS_PRINCIPAL);
                        hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
                        this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
                    }
                    //初始化HDFS
                    Path path = new Path(tablePath);
                    FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
                    if (!fs.exists(path)) {
                        HoodieTableMetaClient.withPropertyBuilder()
                            .setTableType(HUDI_WRITE_TYPE_MOR.equals(tableType) ? HoodieTableType.MERGE_ON_READ : HoodieTableType.COPY_ON_WRITE)
                            .setTableName(tableName)
                            .setPayloadClassName(HoodieAvroPayload.class.getName())
                            .initTable(hadoopConf, tablePath);
                    }
                } catch (IOException e) {
                    LOG.error(ExceptionUtils.getStackTrace(e));
                }
                JSONArray fields = new JSONArray();
                for (Configuration columnConfig : columnsList) {
                    JSONObject confObject = new JSONObject();
                    confObject.put("name", columnConfig.getString("name"));
                    String configType = columnConfig.getString("type");
                    confObject.put("type", "date".equals(configType) || "datetime".equals(configType) ? "string" : configType);
                    fields.add(confObject);
                }
    
                JSONObject schemaObject = new JSONObject();
                schemaObject.put("type", "record");
                schemaObject.put("name", "triprec");
                schemaObject.put("fields", fields);
                String schemaStr = schemaObject.toJSONString();
    
                avroSchema = new Schema.Parser().parse(schemaStr);
    
                // Create the write client to write some records in
                HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                    .withSchema(schemaStr).withParallelism(2, 2)
                    .withDeleteParallelism(2).forTable(tableName)
                    .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
                    .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
                client =
                    new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
            }
    
            @Override
            public void prepare() {
    
            }
    
            @Override
            public void startWrite(RecordReceiver recordReceiver) {
                Record record;
                DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
                DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                AtomicLong counter = new AtomicLong(0);
                List<HoodieRecord<HoodieAvroPayload>> writeRecords = new ArrayList<>();
                while ((record = recordReceiver.getFromReader()) != null) {
                    GenericRecord row = new GenericData.Record(avroSchema);
                    for (int i=0; i<columnsList.size(); i++) {
                        Configuration configuration = columnsList.get(i);
                        String columnName = configuration.getString("name");
                        String columnType = configuration.getString("type");
                        Column column = record.getColumn(i);
                        Object rawData = column.getRawData();
                        if (rawData == null) {
                            row.put(columnName, null);
                            continue;
                        }
                        switch (columnType) {
                            case "int":
                                row.put(columnName, Integer.parseInt(rawData.toString()));
                                break;
                            case "float":
                                row.put(columnName, Float.parseFloat(rawData.toString()));
                                break;
                            case "double":
                                row.put(columnName, Double.parseDouble(rawData.toString()));
                                break;
                            case "date":
                                row.put(columnName, dateFormat.format(rawData));
                                break;
                            case "datetime":
                                row.put(columnName, dateTimeFormat.format(rawData));
                                break;
                            case "string":
                            default:
                                row.put(columnName, rawData.toString());
                        }
                    }
                    String partitionPath = "";
                    if (!partitionList.isEmpty()) {
                        List<Object> values = partitionList.stream().map(row::get).collect(Collectors.toList());
                        partitionPath = StringUtils.join(values, "/");
                    }
                    HoodieKey key = new HoodieKey(row.get(primaryKey).toString(), partitionPath);
                    HoodieRecord<HoodieAvroPayload> hoodieAvroPayload = new HoodieRecord<>(key, new HoodieAvroPayload(Option.of(row)));
                    writeRecords.add(hoodieAvroPayload);
                    long num = counter.incrementAndGet();
    
                    if (num >= batchSize) {
                        flushCache(writeRecords);
                        writeRecords.clear();
                        counter.set(0L);
                    }
                }
                if (!writeRecords.isEmpty()) {
                    flushCache(writeRecords);
                }
            }
    
            private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){
                if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){
                    UserGroupInformation.setConfiguration(hadoopConf);
                    try {
                        UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
                    } catch (Exception e) {
                        String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
                                kerberosKeytabFilePath, kerberosPrincipal);
                        LOG.error(message);
                        throw DataXException.asDataXException(HudiWriterErrorCode.KERBEROS_LOGIN_ERROR, e);
                    }
                }
            }
    
            private void flushCache(List<HoodieRecord<HoodieAvroPayload>> writeRecords) {
                String commitTime = client.startCommit();
                LOG.info("Starting commit " + commitTime);
                switch (writeOption) {
                    case HUDI_WRITE_OPTION_INSERT:
                        client.insert(writeRecords, commitTime);
                        break;
                    case HUDI_WRITE_OPTION_BULK_INSERT:
                        client.bulkInsert(writeRecords, commitTime);
                        break;
                    case HUDI_WRITE_OPTION_UPSERT:
                        client.upsert(writeRecords, commitTime);
                        break;
                }
            }
    
            @Override
            public void post() {
    
            }
    
            @Override
            public void destroy() {
                if (client!=null) {
                    client.close();
                }
            }
        }
    }
    

    Key.java

    package com.alibaba.datax.plugin.writer.hudiwriter;
    
    public class Key {
        public static final String HUDI_TABLE_NAME = "tableName";
        public static final String HUDI_TABLE_PATH = "tablePath";
        public static final String HUDI_PRIMARY_KEY = "primaryKey";
        public static final String HUDI_PARTITION_FIELDS = "partitionFields";
        public static final String HUDI_TABLE_TYPE = "tableType";
        public static final String HUDI_BATCH_SIZE = "batchSize";
        public static final String HUDI_WRITE_OPTION = "writeOption";
        public static final String HUDI_COLUMN = "column";
    
        public static final String HUDI_WRITE_OPTION_INSERT = "insert";
        public static final String HUDI_WRITE_OPTION_BULK_INSERT = "bulk_insert";
        public static final String HUDI_WRITE_OPTION_UPSERT = "upsert";
    
        public static final String HUDI_WRITE_TYPE_COW = "cow";
        public static final String HUDI_WRITE_TYPE_MOR = "mor";
    
        // Kerberos
        public static final String HAVE_KERBEROS = "haveKerberos";
        public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
        public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
    
        public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
    }
    

    HudiWriterErrorCode.java

    package com.alibaba.datax.plugin.writer.hudiwriter;
    
    import com.alibaba.datax.common.spi.ErrorCode;
    
    public enum HudiWriterErrorCode implements ErrorCode {
    
        HUDI_ERROR_TABLE("Hudi Error Table", "您的参数配置错误."),
        HUDI_PARAM_LOST("Hudi Param Lost", "您缺失了必须填写的参数值."),
        HDFS_CONNECT_ERROR("Hdfs Connect Error", "与HDFS建立连接时出现IO异常."),
        KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS认证失败");
    
        private final String code;
        private final String description;
    
        HudiWriterErrorCode(String code, String description) {
            this.code = code;
            this.description = description;
        }
    
        @Override
        public String getCode() {
            return this.code;
        }
    
        @Override
        public String getDescription() {
            return this.description;
        }
    
        @Override
        public String toString() {
            return String.format("Code:[%s], Description:[%s].", this.code,
                    this.description);
        }
    }
    

    plugin.json

    {
        "name": "hudiwriter",
        "class": "com.alibaba.datax.plugin.writer.hudiwriter.HudiWriter",
        "description": "useScene: test. mechanism: use datax framework to transport data to hudi. warn: The more you know about the data, the less problems you encounter.",
        "developer": "alibaba"
    }
    

    plugin_job_template.json

    {
        "name": "hudiwriter",
        "parameter": {
            "tableName": "",
            "tablePath": "",
            "tableType": "",
            "writeOption": "",
            "primaryKey": "",
            "partitionFields": "",
            "batchSize": "",
            "column": []
        }
    }
    

    测试

    wlapp_user_mysql_to_hudi.json

    {
        "job": {
            "setting": {
                "speed": {
                    "channel": 2
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": [
                                "id",
                                "name",
                                "age",
                                "dt",
                                "score",
                                "create_at",
                                "update_at"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:mysql://127.0.0.1:3306/wlapp?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                                    ],
                                    "table": [
                                        "user"
                                    ]
                                }
                            ],
                            "password": "123456",
                            "username": "root",
                            "where": ""
                        }
                    },
                    "writer": {
                        "name": "hudiwriter",
                        "parameter": {
                            "tableName": "user",
                            "tablePath": "hdfs://localhost:9000/user/hive/warehouse/wlapp.db/user",
                            "tableType": "mor",
                            "writeOption": "upsert",
                            "primaryKey": "id",
                            "partitionFields": "dt",
                            "batchSize": 100,
                            "column": [
                                {
                                    "name": "id",
                                    "type": "int"
                                },
                                {
                                    "name": "name",
                                    "type": "string"
                                },
                                {
                                    "name": "age",
                                    "type": "int"
                                },
                                {
                                    "name": "dt",
                                    "type": "date"
                                },
                                {
                                    "name": "score",
                                    "type": "double"
                                },
                                {
                                    "name": "create_at",
                                    "type": "datetime"
                                },
                                {
                                    "name": "update_at",
                                    "type": "datetime"
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    

    启动命令

    python bin/datax.py job/wlapp_user_mysql_to_hudi.json;
    

    带参数启动命令

    #会替换掉wlapp_user_mysql_to_hudi.json中的 ${date} 变量
    python bin/datax.py -p"-Ddate='2022-08-25 00:00:00'" job/wlapp_user_mysql_to_hudi.json;
    

    目前测试过程中运行正常,线上使用有问题会持续更新

    相关文章

      网友评论

        本文标题:DataX扩展hudiwriter组件

        本文链接:https://www.haomeiwen.com/subject/bojbartx.html