美文网首页
DataX同步ES数据到CK

DataX同步ES数据到CK

作者: 越狱的灵感 | 来源:发表于2022-06-06 09:21 被阅读0次

    前言

    存在一些业务场景,需要离线同步数据到异构数据库,DataX算是一个不错的选择。不过开源版本只支持单进程,多线程,如果是需要多进程,需要业务在reader层面查询的时候就规划好对应进程需要读取的数据分片。

    1.png 2.png

    实践

    当前例子为 elasticsearch 同步数据到 clickhouse
    1,下载DataX源码编译(参考https://github.com/alibaba/DataX/blob/master/userGuid.md
    git clone https://github.com/alibaba/DataX.git
    mvn -U clean package assembly:assembly -Dmaven.test.skip=true
    编译生成文件 datax.tar.gz

    3.png

    2,下载DataX-elasticsearch源码(https://github.com/Kestrong/datax-elasticsearch 这个插件可用支持nested数据结构)编译
    git clone https://github.com/Kestrong/datax-elasticsearch
    mvn -U clean package assembly:assembly -Dmaven.test.skip=true
    生成插件文件

    4.png

    3,将打包好的elasticsearchreader插件发到datax/plugin/reader目录下,压缩并上传到远程机器。


    5.png

    4,校验datax是否可用
    cd datax/bin
    python datax.py -r streamreader -w streamwriter 如果生成了模板文件则表示datax编译包正常。

    5,上传配置文件 es2ck_demo.json

    {"job":{"setting":{"speed":{"channel":40}},"content":[{"reader":{"name":"elasticsearchreader","parameter":{"endpoint":"http://es_ip:19200","accessId":"elastic","accessKey":"xxxxx","index":"oss_storage_cap*","type":"_doc","searchType":"dfs_query_then_fetch","headers":{},"scroll":"3m","discovery":"true","search":[{"query":{"match_all":{}}}],"table":{"name":"es_view_oss_storage_cap","column":[{"name":"user"},{"name":"objectAddr"},{"name":"collectStamp"},{"name":"objectBuckets","alias":"buckets"}]}}},"writer":{"name":"clickhousewriter","parameter":{"batchByteSize":134217728,"batchSize":65536,"column":["user","objectAddr","collectStamp","buckets"],"connection":[{"jdbcUrl":"jdbc:clickhouse://ck_ip:28123/object_log","table":["oss_storage_capacity_all"]}],"dryRun":false,"password":"xxxx","postSql":[],"preSql":[],"username":"default","writeMode":"insert"}}}]}}
    

    6,如果数据量很大,可以指定一个时间段,先同步一批

    ##es 查看指定时间段内的数据
    GET oss_storage_cap*/_count
    {"query":{"bool":{"filter":{"range":{"insert_time":{"from":"2020-05-28T00:00:00.000+0800","to":"2020-05-28T23:59:59.000+0800","include_lower":true,"include_upper":true,"boost":1.0}}}}}}
     
     
    ##修改配置
    "search": [{
        "size": 10000, ##这个参数比较重要,要不然读取速度会很慢
        "query": {
            "bool": {
                "filter": {
                    "range": {
                        "insert_time": {
                            "from": "2020-05-28T00:00:00.000+0800",
                            "to": "2020-05-28T23:59:59.000+0800",
                            "include_lower": true,
                            "include_upper": true,
                            "boost": 1.0
                        }
                    }
                }
            }
        }
    }],
    

    可以看到只同步了指定时间段内的数据


    1.png 2.png

    6,开启同步
    执行 python ../bin/datax.py es2ck_demo.json


    3.png

    Tips

    1,由于es的scroll分页查询,默认查询的条数是number_of_shards*size 建议手动设置size大小,否则同步效果速度很慢,例"search":[{"size":10000,"query":{"match_all":{}}}]
    2,Query_Then_Fetch vs DFS-Query_Then_Fetch https://www.elastic.co/cn/blog/understanding-query-then-fetch-vs-dfs-query-then-fetch dfs方式在做查询更精确,但是如果是导出Query_Then_Fetch速度应该更快

    相关文章

      网友评论

          本文标题:DataX同步ES数据到CK

          本文链接:https://www.haomeiwen.com/subject/qbbxmrtx.html