美文网首页
ogg实时同步到Kafka

ogg实时同步到Kafka

作者: 围着火炉切水果 | 来源:发表于2019-06-28 21:58 被阅读0次

目标库(Linux)

安装OGG for bigdata

解压OGG_BigData

unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip

再次解压

tar -vxf OGG_BigData_Linux_x64_12.3.2.1.1.tar

安装java 1.8

参考oracle ogg安装要求
https://docs.oracle.com/goldengate/bd123110/gg-bd/GBDIG/installing-oracle-goldengate-big-data.htm#GBDIG-GUID-8A857DA7-38DE-48DC-8946-FB590AFCD6C0

The Oracle GoldenGate for Big Data are certified for Java 1.8. Before installing and running Oracle GoldenGate for Java, you must install Java (JDK or JRE) version 1.8 or later. Either the Java Runtime Environment (JRE) or the full Java Development Kit (which includes the JRE) may be used.

java1.8下载界面
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

配置环境变量

vi /etc/profile
export JAVA_HOME=/opt/java/jdk1.8.0_211
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
source /etc/profile

初始化ogg

cd /opt/ogg
ggsci
create subdirs
edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

设置检查点

edit  param  ./GLOBALS

CHECKPOINTTABLE test_ogg.checkpoint

配置复制进程

edit param rekafka

REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

配置kafka.props

cd /opt/ogg/dirprm/
vi kafka.props

#注释要去掉,注意空格
gg.handlerlist=kafkahandler //handler类型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op  //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.classpath=dirprm/:/opt/kafka/kafka_2.12-2.3.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
vi custom_kafka_producer.properties
#修改下面两个IP地址
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
acks=1
compression.type=gzip //压缩类型
reconnect.backoff.ms=1000 //重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

添加跟踪文件到复制进程

add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

安装kafka

cd /opt/kafka
tar -vxf kafka_2.12-2.3.0.tgz

修改配置文件

cd /opt/kafka/kafka_2.12-2.3.0/config

vi server.properties
listeners=PLAINTEXT://192.168.208.153:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.208.153:9092

开启kafka

cd /opt/kafka/kafka_2.12-2.3.0
bin/zookeeper-server-start.sh config/zookeeper.properties
#打开另一个窗口
cd /opt/kafka/kafka_2.12-2.3.0
bin/kafka-server-start.sh config/server.properties

源库(Windows)

切换数据库到归档模式

conn / as sysdba
shutdown immediate
startup mount
alter database archivelog;
alter database open;
alter database force logging;
alter database add supplemental log data;
alter system switch logfile;
alter system set recyclebin=off scope=spfile;

创建复制用户

create user ogg identified by oracle;
grant dba to ogg;

创建测试表

conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

初始化OGG

ogg安装在c:\ogg目录下

ggsci
create subdirs

配置extract

edit param ext
extract ext
--dynamicresolution
SETENV(ORACLE_HOME='D:\app\Administrator\product\11.2.0\dbhome_1') 
setenv (NLS_LANG="american_america.AL32UTF8")
SETENV (ORACLE_SID="SWDS") 
--ddl include all
userid ogg,password oracle 
eofdelaycsecs 1
flushcsecs 1
--gettruncates
rmthost 192.168.208.153, mgrport 7809
rmttrail /opt/ogg/dirdat/to
--br broff
table test_ogg.test_ogg;
dblogin userid ogg,password oracle
add extract ext,tranlog,begin now
add rmttrail /opt/ogg/dirdat/to,extract ext

添加复制表

dblogin userid ogg password oracle
add trandata test_ogg.test_ogg
info trandata test_ogg.test_ogg

配置定义文件

edit param test_ogg
defsfile C:\ogg\dirdef\test_ogg.test_ogg
userid ogg,password oracle
table test_ogg.test_ogg;

生成定义文件

C:\ogg\defgen.exe paramfile C:\ogg\dirprm\test_ogg.prm

把生成的文件复制到kafka端/opt/ogg/dirdef/

设置mgr

edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

启动

启动所有进程

测试数据源端

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

目标端

cd /opt/kafka/kafka_2.12-2.3.0/

bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning

填坑

enable_goldengate_replication is not set to true

2015-08-24 14:39:21 ERROR OGG-02091 Operation not supported because enable_goldengate_replication is not set to true.

SQL> alter system set enable_goldengate_replication=true;

参考文档
https://www.jianshu.com/p/c06c9b0ff2f0
完成后注意看有没有坑

相关文章

网友评论

      本文标题:ogg实时同步到Kafka

      本文链接:https://www.haomeiwen.com/subject/vacgqctx.html