美文网首页
Datax 自定义函数 dx_groovy

Datax 自定义函数 dx_groovy

作者: Skye_kh | 来源:发表于2017-09-16 10:01 被阅读391次

    DataX 是阿里云开源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

    Datax 的数据转换支持 UserDefined Function

    官方的使用说明如下:

    dx_groovy

    • 参数。
      • 第一个参数: groovy code
      • 第二个参数(列表或者为空):extraPackage
    • 备注:
      • dx_groovy只能调用一次。不能多次调用。
      • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
      • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
      • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
    • 举例:
    groovy 实现的subStr:
            String code = "Column column = record.getColumn(1);\n" +
                    " String oriValue = column.asString();\n" +
                    " String newValue = oriValue.substring(0, 3);\n" +
                    " record.setColumn(1, new StringColumn(newValue));\n" +
                    " return record;";
            dx_groovy(record);
    
    groovy 实现的Replace
    String code2 = "Column column = record.getColumn(1);\n" +
                    " String oriValue = column.asString();\n" +
                    " String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
                    " record.setColumn(1, new StringColumn(newValue));\n" +
                    " return record;";
    
    groovy 实现的Pad
    String code3 = "Column column = record.getColumn(1);\n" +
                    " String oriValue = column.asString();\n" +
                    " String padString = \"12345\";\n" +
                    " String finalPad = \"\";\n" +
                    " int NeedLength = 8 - oriValue.length();\n" +
                    "        while (NeedLength > 0) {\n" +
                    "\n" +
                    "            if (NeedLength >= padString.length()) {\n" +
                    "                finalPad += padString;\n" +
                    "                NeedLength -= padString.length();\n" +
                    "            } else {\n" +
                    "                finalPad += padString.substring(0, NeedLength);\n" +
                    "                NeedLength = 0;\n" +
                    "            }\n" +
                    "        }\n" +
                    " String newValue= finalPad + oriValue;\n" +
                    " record.setColumn(1, new StringColumn(newValue));\n" +
                    " return record;";
    
    • 以下是我自己实现的字符串替换 dx_groovy 函数
      {
      "job": {
      "content": [
      {
      "reader": {
      "name": "mysqlreader",
      "parameter": {
      "column": ["user_role_id","username","ROLE"],
      "connection": [
      {
      "jdbcUrl": ["jdbc:mysql://10.1.18.155:3306/saiku_test"],
      "table": ["user_roles_copy"]
      }
      ],
      "password": "123456",
      "username": "root",
      "where": ""
      }
      },
      "writer": {
      "name": "mysqlwriter",
      "parameter": {
      "column": ["user_role_id","username","ROLE"],
      "connection": [
      {
      "jdbcUrl": "jdbc:mysql://10.1.18.155:3306/saiku_test",
      "table": ["user_roles_trans"]
      }
      ],
      "password": "123456",
      "preSql": [],
      "session": [],
      "username": "root",
      "writeMode": "insert"
      }
      },
      "transformer": [
      {
      "name": "dx_filter",
      "parameter":
      {
      "columnIndex":1,
      "paras":["=","null"]
      }
      },{
      "name": "dx_groovy",
      "parameter":
      {
      "code": "Column column = record.getColumn(2);\nString oriValue = column.asString();\nString sourceString = "ROLE_ADMIN";\nString changeString = "replaceTest";\nif (oriValue.equals(sourceString)){record.setColumn(2, new StringColumn(changeString));\nreturn record;}\nreturn record;",
      "extraPackage":[]
      }
      }
      ]
      }
      ],
      "setting": {
      "speed": {
      "channel": "1"
      }
      }
      }
      }

    相关文章

      网友评论

          本文标题:Datax 自定义函数 dx_groovy

          本文链接:https://www.haomeiwen.com/subject/nfsysxtx.html