前言
存在一些业务场景,需要离线同步数据到异构数据库,DataX算是一个不错的选择。不过开源版本只支持单进程,多线程,如果是需要多进程,需要业务在reader层面查询的时候就规划好对应进程需要读取的数据分片。
实践
当前例子为 elasticsearch 同步数据到 clickhouse
1,下载DataX源码编译(参考https://github.com/alibaba/DataX/blob/master/userGuid.md)
git clone https://github.com/alibaba/DataX.git
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
编译生成文件 datax.tar.gz
2,下载DataX-elasticsearch源码(https://github.com/Kestrong/datax-elasticsearch 这个插件可用支持nested数据结构)编译
git clone https://github.com/Kestrong/datax-elasticsearch
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
生成插件文件
3,将打包好的elasticsearchreader插件发到datax/plugin/reader目录下,压缩并上传到远程机器。
5.png
4,校验datax是否可用
cd datax/bin
python datax.py -r streamreader -w streamwriter 如果生成了模板文件则表示datax编译包正常。
5,上传配置文件 es2ck_demo.json
{"job":{"setting":{"speed":{"channel":40}},"content":[{"reader":{"name":"elasticsearchreader","parameter":{"endpoint":"http://es_ip:19200","accessId":"elastic","accessKey":"xxxxx","index":"oss_storage_cap*","type":"_doc","searchType":"dfs_query_then_fetch","headers":{},"scroll":"3m","discovery":"true","search":[{"query":{"match_all":{}}}],"table":{"name":"es_view_oss_storage_cap","column":[{"name":"user"},{"name":"objectAddr"},{"name":"collectStamp"},{"name":"objectBuckets","alias":"buckets"}]}}},"writer":{"name":"clickhousewriter","parameter":{"batchByteSize":134217728,"batchSize":65536,"column":["user","objectAddr","collectStamp","buckets"],"connection":[{"jdbcUrl":"jdbc:clickhouse://ck_ip:28123/object_log","table":["oss_storage_capacity_all"]}],"dryRun":false,"password":"xxxx","postSql":[],"preSql":[],"username":"default","writeMode":"insert"}}}]}}
6,如果数据量很大,可以指定一个时间段,先同步一批
##es 查看指定时间段内的数据
GET oss_storage_cap*/_count
{"query":{"bool":{"filter":{"range":{"insert_time":{"from":"2020-05-28T00:00:00.000+0800","to":"2020-05-28T23:59:59.000+0800","include_lower":true,"include_upper":true,"boost":1.0}}}}}}
##修改配置
"search": [{
"size": 10000, ##这个参数比较重要,要不然读取速度会很慢
"query": {
"bool": {
"filter": {
"range": {
"insert_time": {
"from": "2020-05-28T00:00:00.000+0800",
"to": "2020-05-28T23:59:59.000+0800",
"include_lower": true,
"include_upper": true,
"boost": 1.0
}
}
}
}
}
}],
可以看到只同步了指定时间段内的数据
1.png 2.png
6,开启同步
执行 python ../bin/datax.py es2ck_demo.json
3.png
Tips
1,由于es的scroll分页查询,默认查询的条数是number_of_shards*size 建议手动设置size大小,否则同步效果速度很慢,例"search":[{"size":10000,"query":{"match_all":{}}}]
2,Query_Then_Fetch vs DFS-Query_Then_Fetch https://www.elastic.co/cn/blog/understanding-query-then-fetch-vs-dfs-query-then-fetch dfs方式在做查询更精确,但是如果是导出Query_Then_Fetch速度应该更快
网友评论