美文网首页
DataX3.0介绍

DataX3.0介绍

作者: peiwj1993 | 来源:发表于2018-04-16 22:02 被阅读0次

    概览

    datax是一个异构数据源离线同步工具,主要实现包括关系型数据库(MySQL、Oracle等)、MongoDB、Hive、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

    DataX3.0框架设计

    image
    DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,Reader从源数据库中读取数据转换成datax内部的数据格式,Writer从datax中把数据读出来并且转换成目的端的数据格式。
    Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
    Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
    Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    应用场景

    通常是在服务暂停的情况下,短时间将一份数据从一个数据库迁移至其他不同类型的数据库。
    优点:
    1.提供了数据监控
    2.丰富的数据转换功能,可以重新定制Reader,Writer插件实现数据脱敏,补全,过滤等数据转换功能
    3.可以在配置文件中配置精确的速度控制
    4.强劲的同步性能,支持多线程操作,可以快速迁移数据
    5.健壮的容错机制,支持线程重试等机制,可以保证迁移过程稳定执行
    缺点:
    1.数据一致性问题。这个工具强烈建议在服务暂停或者禁止执行写操作的情况下使用。如果在迁移的过程中还有写操作的话,这些增量数据无法实时从源数据库同步到目的数据库,无法保证迁移前后数据一致性。

    使用方式

    1.安装python环境
    2.下载工具包并解压:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
    3.写好json格式的配置文件
    4.进入工具包解压目录下的bin目录,执行命令:python datax.py 你的配置文件

    配置文件的实例:

    {
        "job": {
            "setting": {
                //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它
                //channel设置并发程度
                "speed": {
                     "byte": 1048576,
                     "channel":"5",                 
                }
            },
            "content": [
                {
                    //Reader配置信息,包括源数据库配置信息,需要迁移的表以及表中相关的列
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "root",
                            "password": "Test12345@",
                            "column": ["*"],
                            "connection": [
                                {
                                    "table": ["test_table"],
                                    "jdbcUrl": ["jdbc:mysql://192.168.0.115:3306/test"]
                                }
                            ]
                        }
                    },
                    //writer配置信息,包括目的数据库配置信息,需要写入迁移数据的表,以及表中相关的列
                   "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "test",
                            "password": "123456",
                            "column":["*"],
                            "connection": [
                                {
                                    "table": ["test_table"],
                                   "jdbcUrl":"jdbc:mysql://192.168.0.110:3306/test?useUnicode=true&characterEncoding=utf8"                                
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    

    1.一个job对应的一个迁移任务,可以理解为一个job就是对一张表的迁移
    2.一个job会根据上述配置文件的channel配置切分为多个task去执行迁移操作
    3.datax支持分表分库后的数据迁移,上述的jdbcUrl可以配置多个url
    4.datax支持大部分数据库的迁移操作,目前支持的reader,writer可以参考:https://github.com/alibaba/DataX/blob/master/introduction.md
    5.关系型数据库还支持直接根据sql语句来进行数据迁移同步

    插件开发

    datax支持writer和reader的定制,实现方法如下(以mysql为例):
    1.下载datax项目,新建module子模块
    2.编写plugin.json

    {
        "name": "mysqlreader",//reader名称
        "class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",//对应的实现类
        "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
        "developer": "alibaba"
    }
    

    3.编写实现类,主要是继承抽象类Reader,实现Job、Task两个内部类,Job对应上面配置文件的Job,主要是一些配置信息,数据传输相关操作是实现Task中startRead方法

      public void startRead(Configuration readerSliceConfig,
                                  RecordSender recordSender,
                                  TaskPluginCollector taskPluginCollector, int fetchSize) {
                String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
                String table = readerSliceConfig.getString(Key.TABLE);
                .....省略部分代码.....
                //获取数据库连接
                Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,username, password);
                .....省略部分代码.....
                ResultSet rs = null;
                try {
                    //根据配置的表名或者sql语句从数据库中读出数据
                    rs = DBUtil.query(conn, querySql, fetchSize);
                    queryPerfRecord.end();
                    ResultSetMetaData metaData = rs.getMetaData();
                    columnNumber = metaData.getColumnCount();
                    .....省略部分代码.....
                    while (rs.next()) {
                        //将结果集中的每一行记录转变为datax内部数据类型(Long,Double,String,Date,Boolean,Bytes),并发送到framework中
                        Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
                        recordSender.sendToWriter(record);
                    }
                }catch (Exception e) {
                    throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
                } finally {
                    DBUtil.closeDBResources(null, conn);
                }
            }
    

    4.将生成的jar包和配置文件放到安装目录下面的plugin目录下即可

    相关文章

      网友评论

          本文标题:DataX3.0介绍

          本文链接:https://www.haomeiwen.com/subject/kcdykftx.html