美文网首页
Flink 自定义UDTF函数 同步数组类型到ES中

Flink 自定义UDTF函数 同步数组类型到ES中

作者: lodestar | 来源:发表于2022-03-07 22:34 被阅读0次

    将Mysql中 test表同步到ES中,并且将tags(逗号分隔的字符串)转化数组同步到ES中的数组。
    Mysql中test表结构

    
    CREATE TABLE `test` (
        `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
        `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
        `tags` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci
    

    数据如下:


    image.png

    ES中数据结构

    
    PUT info-flow-test3
    {
      "mappings": {
        "properties": {
          "id": {
            "type": "keyword"
          },
          "tags": {
            "type": "keyword"
          }
        }
      }
    }
    

    Flink 中

    CREATE TABLE es_info_flow_test3 (
        id string,
        tags ARRAY < string >,
        PRIMARY KEY (id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
      )
    WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = '127.0.0.1:9200',
        'index' = 'info-flow-test3',
        'username' = 'elastic',
        'password' = '123456'
      );
    
    CREATE TABLE mysqlcdc_test (
        id INT, tags string, PRIMARY KEY (id) NOT ENFORCED
      )
    WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '127.0.0.1',
        'port' = '3306',
        'username' = 'admin',
        'password' = '123456',
        'database-name' = 'test_db',
        'table-name' = 'test'
      );
    

    运行Flink任务脚本如下:

    insert into es_info_flow_test3 (id, tags)
    select CAST(t.id as STRING) as id, t
    from
      mysqlcdc_test t, lateral table (ASI_UDTF (`tags`)) as T (t)
    

    自定义UDTF函数参考阿里云链接,注意需要使用java8
    https://help.aliyun.com/document_detail/188055.html
    上传jar包后,如果返回如下表明包可以上传。

    image.png

    查看tags的类型

    POST info-flow-test3/_search
    返回值:
    {
      "took" : 0,
      "timed_out" : false,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "info-flow-test3",
            "_type" : "_doc",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : "2",
              "tags" : [
                "3",
                "4",
                "5"
              ]
            }
          },
          {
            "_index" : "info-flow-test3",
            "_type" : "_doc",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : "3",
              "tags" : [
                "6"
              ]
            }
          }
        ]
      }
    }
    
    

    相关文章

      网友评论

          本文标题:Flink 自定义UDTF函数 同步数组类型到ES中

          本文链接:https://www.haomeiwen.com/subject/nurnrrtx.html