美文网首页
Flink-mongodb-cdc 同步数据到Kafka

Flink-mongodb-cdc 同步数据到Kafka

作者: 我还是老油条 | 来源:发表于2023-07-25 18:21 被阅读0次

    1.项目中需要同步mongodb中数据到kafka ,然后再根据同步的数据进行后续操作,需要验证方案是否可行

    2.首先搭建mongodb 集群,因为flink-mongodb-cdc只有在集群模式下才能运行
    MongoDB CDC Connector 是基于 MongoDB Change Streams 特性来实现的。MongoDB 是一个分布式的数据库,在分布式的环境中,集群成员之间一般会进行相互复制,来确保数据的完整性和容错性。与 MySQL 的 Binlog 比较类似,MongoDB 也提供了 oplog 来记录数据的操作变更,次要节点之间通过访问主节点的变更日志来进行数据的同步

    3.搭建mongodo集群
    mongodb 使用4.2.24版本
    一共两台就行
    第一台配置修改(注意logpath和dbpath 需要自己建文件)

    logpath=/root/server/mongomatser/log/mongodb.log
    dbpath=/root/server/mongomatser/data
    #verbose = true
    #vvvv=true
    port = 29001
    bind_ip = 127.0.0.1
    replSet = repltest
    fork = true
    logappend=true
    shardsvr=true
    oplogSize=10000
    logRotate=reopen
    
    
    

    第二台配置修改

    logpath=/root/server/mongoslave/log/mongodb.log
    dbpath=/root/server/mongoslave/data
    #verbose = true
    #vvvv=true
    port = 29002
    bind_ip = 127.0.0.1
    replSet = repltest
    fork = true
    logappend=true
    shardsvr=true
    oplogSize=10000
    logRotate=reopen
    
    

    4.启动并登录mongodb

    ./mongod --config /server/mongoslave/mongo/conf/mongo.conf
    
    /root/mongodb/bin/mongo --host 127.0.0.1  --port 27017
    
    config={_id:”mongoTest”,members:[{_id:0,host:”127.0.0.1:27017”},{_id:1,host:”127.0.01:27017”},]}
    
    rs.initiate(config)
    
    rs.status()
    

    5.navicat 登录,测试主从同步是否成功

    6.github上下载flink-1.17.1-bin-scala_2.12.tgz
    解压后,往Lib 下放入
    flink-sql-connector-mongodb-cdc-2.4.0.jar
    flink-sql-connector-kafka-1.16.1.jar
    flink-json-1.17.1.jar

    修改配置flink-conf.yaml

    rest.port: 8581
    
    rest.address: 127.0.0.1(需要修改为本机ip)
    
    rest.bind-address: 127.0.0.1 (需要修改为本机ip)
    
    

    启动flink : cd bin ./start-cluster.sh (一定要先启动flink,否则后面提交任务无法看到报错)

    7.提交任务

    CREATE TABLE source_user ( 
    _id    String,
    name    String,
    age    INT,
    PRIMARY KEY (_id) not ENFORCED
    )
    WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = '127.0.0.1:29001,127.0.0.1:29002',
    'database' = 'test',
    'collection' = 'user')
    
    
    CREATE TABLE kafka_source_user ( 
    _id String,name String,age INT,PRIMARY KEY (_id) not ENFORCED
    )
    WITH (
    'connector' = 'upsert-kafka'
    ,'topic' = 'mongodb-test-kafka-001'
    ,'properties.bootstrap.servers' = '127.0.0.1:9092'
    ,'key.format' = 'json'
    ,'value.format' = 'json' )
    
    insert into kafka_source_user
    select _id,name,age
    from source_user
    
    

    提交成功页面有任务生成


    image.png

    8.在mongodb中新增或者修改数据,kafka就会接收到一条消息,同步成功

    相关文章

      网友评论

          本文标题:Flink-mongodb-cdc 同步数据到Kafka

          本文链接:https://www.haomeiwen.com/subject/vaehpdtx.html