美文网首页
OGG实时同步数据到kafka配置

OGG实时同步数据到kafka配置

作者: 献给记性不好的自己 | 来源:发表于2017-12-06 15:18 被阅读639次

    一、源端(oracle数据库端)---同这篇文章源端配置http://www.jianshu.com/p/53882229b70e

    #Extract进程
    #一定要记得同步之前要开启表的全补充日志
    #alter table tb_name add supplemental log data (all) columns;
    GGSCI (zwjfdb3) 7> view param EZWJFBOR
    
    EXTRACT EZWJFBOR
    SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
    SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
    SETENV (ORACLE_SID = "zwjfdb3")
    --捕获 truncate 操作
    gettruncates
    --定义discardfile文件位置,如果处理中有记录出错会写入到此文件中
    DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024
    --动态解析表名
    DYNAMICRESOLUTION
    --获取更新之前数据
    GETUPDATEBEFORES
    --当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)
    DBOPTIONS  ALLOWUNUSEDCOLUMN
    --每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息
    REPORTCOUNT EVERY 30 MINUTES, RATE
    --每隔3分钟检查一下大事务,超过2小时还没结束的进行报告
    WARNLONGTRANS 2h,CHECKINTERVAL 3m
    --不会从闪回日志中获取数据
    FETCHOPTIONS NOUSESNAPSHOT
    USERID xxxxxx,PASSWORD xxxxxx
    EXTTRAIL ./dirdat/zb
    TABLE xx.xx;
    TABLE xx.xx;
    
    #添加抽取进程
    GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW
    EXTRACT added.
    #定义trail文件
    GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200
    EXTTRAIL added.
    
    #pump extract进程
    GGSCI (zwjfdb3) 8> view param PZWJFBOR
    EXTRACT PZWJFBOR
    SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
    PASSTHRU
    DYNAMICRESOLUTION
    RMTHOST xx.xx.xx.xx,MGRPOT 7809 
    RMTTRAIL ./dirdat/zb
    TABLE xx.xx;
    TABLE xx.xx;
    #添加pump捕获组
    GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb
    EXTRACT added.
    #定义pump trail文件
    GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200
    RMTTRAIL added.
    
    #启动进程
    GGSCI (zwjfdb3) 8> start EXTRACT EZWJFBOR
    GGSCI (zwjfdb3) 8> start EXTRACT PZWJFBOR
    GGSCI (zwjfdb3) 9> info all
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    MANAGER     RUNNING                                           
    EXTRACT     RUNNING     EZWJFBOR    00:00:00      00:00:08    
    EXTRACT     RUNNING     PZWJFBOR    00:00:00      00:00:16 
    
    #传递表结构
    GGSCI (zwjfdb3) 4> view param defgen
    DEFSFILE dirdef/source.def, PURGE
    USERID xxxxxx, PASSWORD xxxx
    TABLE xx.xx;
    TABLE xx.xx;
    [oracle@zwjfdb3 12.2]$ ./defgen paramfile dirprm/defgen.prm 
    
    ***********************************************************************
            Oracle GoldenGate Table Definition Generator for Oracle
     Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
       Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29
     
    Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
    
    
                        Starting at 2017-12-05 16:21:03
    ***********************************************************************
    
    Operating System Version:
    Linux
    Version #1 SMP Thu Feb 23 03:04:39 UTC 2017, Release 3.10.0-514.6.2.el7.x86_64
    Node: zwjfdb3
    Machine: x86_64
                             soft limit   hard limit
    Address Space Size   :    unlimited    unlimited
    Heap Size            :    unlimited    unlimited
    File Size            :    unlimited    unlimited
    CPU Time             :    unlimited    unlimited
    
    Process id: 375
    
    ***********************************************************************
    **            Running with the following parameters                  **
    ***********************************************************************
    把./dirdef/source.def文件拷贝到目标端的./dirdef目录下
    
    

    二、目标端(kafka)

    #java必须为1.8
    [root@bigdata01 ~]$ java -version
    java version "1.8.0_45"
    Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
    Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
    [root@bigdata01 ~]$ 
    #创建ogg管理用户,并更改ogg安装路径权限
    [root@bigdata01 ~]$ groupadd ogg
    [root@bigdata01 ~]$ useradd -g ogg ogg
    [root@bigdata01 ~]$ chown -R ogg:ogg  /opt/ogg
    #上传并解压ggs_Adapters_Linux_x64.tar到相应目录
    [ogg@bigdata01 ogg]$ pwd
    /opt/ogg
    [ogg@bigdata01 ogg]$ ll
    total 582548
    drwxr-xr-x 6 ogg ogg     4096 Jun  7  2016 AdapterExamples
    -rw-r----- 1 ogg ogg      426 Oct 15  2010 bcpfmt.tpl
    -rw-r----- 1 ogg ogg     1725 Oct 15  2010 bcrypt.txt
    -rwxrwxr-x 1 ogg ogg  8557335 May  1  2016 cachefiledump
    -rwxrwxr-x 1 ogg ogg  8730645 May  1  2016 checkprm
    -rwxr-x--- 1 ogg ogg  9567306 May  1  2016 convchk
    -rwxrwxr-x 1 ogg ogg 15019428 May  1  2016 convprm
    -rw-r----- 1 ogg ogg      159 Oct 15  2010 db2cntl.tpl
    drwxr-x--- 2 ogg ogg     4096 Dec  5 14:05 dirchk
    drwxr-x--- 2 ogg ogg     4096 Dec  5 14:05 dircrd
    
    #配置环境变量
    [ogg@bigdata01 ~]$ cat ~/.bash_profile 
    # .bash_profile
    
    # Get the aliases and functions
    if [ -f ~/.bashrc ]; then
        . ~/.bashrc
    fi
    
    # User specific environment and startup programs
    
    export JAVA_HOME=/opt/java8
    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=/opt/java8/jre/lib/amd64/libjsig.so:/opt/java8/jre/lib/amd64/server/libjvm.so:/opt/java8/jre/lib/amd64/server:/opt/java8/jre/lib/amd64
    PATH=$PATH:$HOME/bin:$JAVA_HOME:$JAVA_HOME/bin:$OGG_HOME
    
    export PATH
    [ogg@bigdata01 ~]$ 
    
    #登录并用命令创建ogg所需文件夹
    [ogg@bigdata01 ogg]$ ./ggsci 
    
    Oracle GoldenGate Command Interpreter
    Version 12.2.0.1.160419 OGGCORE_12.2.0.1.0OGGBP_PLATFORMS_160430.1401
    Linux, x64, 64bit (optimized), Generic on Apr 30 2016 16:21:34
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (bigdata01) 1> info all
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    MANAGER     STOPPED                                           
    
    
    GGSCI (bigdata01) 2> create subdirs
    
    Creating subdirectories under current directory /opt/ogg
    
    Parameter files                /opt/ogg/dirprm: created
    Report files                   /opt/ogg/dirrpt: created
    Checkpoint files               /opt/ogg/dirchk: created
    Process status files           /opt/ogg/dirpcs: created
    SQL script files               /opt/ogg/dirsql: created
    Database definitions files     /opt/ogg/dirdef: created
    Extract data files             /opt/ogg/dirdat: created
    Temporary files                /opt/ogg/dirtmp: created
    Credential store files         /opt/ogg/dircrd: created
    Masterkey wallet files         /opt/ogg/dirwlt: created
    Dump files                     /opt/ogg/dirdmp: created
    
    #配置mgr
    GGSCI (bigdata01) 3> view param MGR
    PORT 7809
    DYNAMICPORTLIST 7810-7909
    --定期清理dirdat路径下的本地队列,保留期限10天,过期后自动删除。
    PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
    LAGREPORTHOURS 1
    LAGINFOMINUTES 30
    LAGCRITICALMINUTES 45
    
    #开启mgr
    GGSCI (bigdata01) 4> start mgr
    Manager started.
    
    
    GGSCI (bigdata01) 5> info all
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    MANAGER     RUNNING  
    
    #配置kafka
    [ogg@bigdata01 big-data]$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
    [ogg@bigdata01 big-data]$ cd $OGG_HOME/dirprm/
    [ogg@bigdata01 big-data]$ vi kafka.props 
    [ogg@bigdata01 big-data]$ cat kafka.props 
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type = kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    gg.handler.kafkahandler.TopicName =ogg_zwjfborrower
    gg.handler.kafkahandler.format =json
    gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    
    gg.handler.kafkahandler.mode =op
    #gg.handler.kafkahandler.maxGroupSize =100, 1Mb
    #gg.handler.kafkahandler.minGroupSize =50, 500Kb
    
    
    goldengate.userexit.timestamp=utc
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    
    gg.log=log4j
    gg.log.level=INFO
    
    gg.report.time=30sec
    
    #Sample gg.classpath for Apache Kafka
    gg.classpath=dirprm/:/data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/*
    #Sample gg.classpath for HDP
    #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
    
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 
    
    #配置custom_kafka_producer.properties
    [ogg@bigdata01 dirprm]$ cat custom_kafka_producer.properties 
    bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
    acks=1
    compression.type=gzip
    reconnect.backoff.ms=1000
    
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    # 100KB per partition
    batch.size=102400
    linger.ms=10000
    
    #定义replication
    GGSCI (bigdata01) 2> view param RZWJFBOR
    REPLICAT RZWJFBOR
    Setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    SOURCEDEFS dirdef/source.def
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP ZWJFBORROWER.*, TARGET ZWJFBORROWER.*;
    
    #指定Trail文件
    GGSCI (bigdata01) 2> add replicat RZWJFBOR, exttrail ./dirdat/zb
    #启动replicat进程
    GGSCI (bigdata01) 2>start replicat RZWJFBOR
    GGSCI (bigdata01) 3> info all
    
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
    
    MANAGER     RUNNING                                           
    REPLICAT    RUNNING     RZWJFBOR    00:00:00      00:00:08    
    

    三、测试数据是否到kafka

    [root@bigdata01 ~]# /data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/bin/kafka-console-consumer  --zookeeper bigdata01:2181 --topic ogg_zwjfborrower --from-beginning 
    

    相关文章

      网友评论

          本文标题:OGG实时同步数据到kafka配置

          本文链接:https://www.haomeiwen.com/subject/kaywbxtx.html