kettle 同步Oracle 与 Postgres
分类专栏: Kettle
环境: PDI 8.2 ,windows, oracle 12C, postgres 12
整个过程性能很好,10分钟+1分钟完成。接下是动态更新任务。
在spoon上,建立一个数据库连接postgres_150,并共享出来。
建议使用jndi的方式,下面更新后,需要重启spoon(这是它的缺点)
simple-jndi/jdbc.properties的设置如下:
postgres_150/type=javax.sql.DataSource
postgres_150/driver=org.postgresql.Driver
postgres_150/url=jdbc:postgresql://ip:ports/yourdatabase
postgres_150/user=username
postgres_150/password=****
1.建立时间中间表
createtableD_BZDZ_MLP_TIMES
(
idNUMERICnotnull,
last_loadTIMESTAMP(6),
current_loadTIMESTAMP(6)
)
插入初始化的数据
INSERTINTOtopology.d_bzdz_mlp_times(
id, last_load, current_load)
VALUES(1, to_timestamp('1971-01-01 01:01:01','YYYY-MM-DD HH24:MI:SS'),
to_timestamp('1971-02-02 01:01:01','YYYY-MM-DD HH24:MI:SS') );
# 或者
INSERTINTOtopology.d_bzdz_mlp_times(
id, last_load, current_load)
VALUES(1,'1971-01-01 01:01:01'::timestamp,'1971-01-01 01:01:01'::timestamp);
2..先把数据一次性从Oralce导入postgres,采用表输入和表输出
在表输出中通过SQL建立表,添加gemo字段,注意Oracle与postgres数据类型的不一样。
如果目标表和源表的字段类型不一致,需要在select * ^语句中转换,比如to_number("string")把字符串转成数字
createtableD_BZDZ_MLP_new
(
systemidVARCHAR(50),
sssqcjwhdmVARCHAR(13),
ssjlxdmVARCHAR(20),
sspcsdmVARCHAR(12),
dzxxdTEXT,
........... ......
zxjdNUMERIC(30,20),
zxwdNUMERIC(30,20),
geom GEOMETRY(Point,4326),
zxztTEXT,
smztTEXT,
sffwTEXT,
uuidVARCHAR(50),
cccjsjTIMESTAMP(6),
lastupdatedtimeTIMESTAMP(6)
)
Postgres中对表建立索引
不建立索引的话,后面的插入/更新转换步骤会非常慢。
因为插入/更新都需要进行select操作(这里是select systemid ***),再决定是插入还是更新。
createindexd_bzdz_mlp_idx_sysidond_bzdz_mlp(systemid);
然后建立一个转换,从Oralce中输入,输出到Postgres,一次性批量输出数据。
具体步骤包括:获取系统时间、更新时间中间表、获取时间中间表数据、查询数据、表输出。
记下上面同步完毕的时间,对时间中间表进行更新:
updatetopology.D_MLP_TIMESsetlast_load = current_loadwhereid=1;
3.建立四个转换,用一个作业把这四个转换运转起来。
3.1时间同步转换
从systemdate 获取当前时间,插入更新时间中间表的 当前时间
3.2 数据同步转换。
按照时间中间表,从oracle中查询变化的数据
SELECT
LAST_LOAD last_load
, CURRENT_LOAD current_load
FROMtopology.D_BZDZ_MLP_TIMES
last_load 和 current_load 作为参数传入下面的查询
SELECT
systemid,
.....
zxjd,
zxwd,
......
cccjsj,
lastupdatedtime
FROMLG.D_MLP
WHERE(x >112.916andx <114.082andy>22.526andy>24.005)ANDlastupdatedtime >= ?ANDlastupdatedtime < ?
把上面表输入的数据,插入/更新到postgres
以systemid作为查询的关键字
3.3 时间中间表同步转换
这是一个SQL脚本转换
updatetopology.D_BZDZ_MLP_TIMESsetlast_load = current_loadwhereid=1;
3.4 更新geom字段
update tablesetgeom=ST_SetSRID(st_point(x,y),4326)wheregeom is null ;
网友评论