美文网首页Mongo
Mongo新机制ChangeStream解决数据同步迁移的问题

Mongo新机制ChangeStream解决数据同步迁移的问题

作者: 十五倍压枪 | 来源:发表于2019-06-04 16:02 被阅读5次

    问题背景

    在公司中,不同业务部门之间有时会像一个个孤岛,无法连接起来,各自为战,各自部门之间的工作成果以及数据因为格式问题,完整性问题,很难给到其他部门去复用。当然,正常情况下,应该是有专门的架构组童鞋去做一些数据的整合,各部门业务之间的数据协商统一之类的工作。不过这次面临的问题就是,如何在没有外部资源协助下,如何完整的将X部门存档在mongo并实时写入的数据完整同步到本部门的mongo集群中并使用呢?

    问题分析

    涉及到数据同步这方面的问题其实主要要考虑的是两个方面,历史数据和实时数据。最朴素的想法其实也很简单,以一个时间节点做为分界点,在这个时间节点截下mongo库的数据快照并dump,这个时间节点后的数据可以通过程序记录游标定时范围查询控制返回。
    这是一个比较基础的思路,主要存在以下问题:

    1. 实时性相当差,我们这里的数据的新鲜程度完全依赖于这个“定时”。
    2. 逻辑上比较绕,而且依赖于X部门有对数据做时间戳,那如果下一次要同步的数据没有写时间戳,怎么办呢?
    3. 开发工作比较麻烦,调试上也需要步步谨慎,数据量大情况下又很难看出哪里有问题,只能等待问题来找你,很被动。

    解决方案

    Mongo3提供了新的特性非常契合我现在的场景,这个新特性的名字叫做ChangeStream。首先我先放上官方文档
    MongoDB-changeStreams
    然后再开始胡咧咧自己的理解。
    这个一个可以在MongoClient(包括Java,python等)去订阅Mongo数据库中某张表最新事件变化的特性,相当于以事件驱动机制对Collection的变化做监听,化主动为被动。是的我没有写错,原先朴素的思路有提到,我们需要主动定时去用时间游标去捞取最新的数据,而现在变成了只需要“被动”等待collection的变化消息过来即可。

    举个例子:当我们监控数据库的CollectionA时,每当A被写入一条数据(插入,更新,删除等),我们也能马上收到这条消息,根据消息进行自定义的操作,比如A表的删除数据你可以只做更新字段来表示,灵活度和实时性都非常实用。

    实验报告

    这里放上对该特性的一些实地测试。
    这里是最初的python版测试代码,可以提供参考,但是实际的测试实验是用java-client完成的,逻辑也是类似。

    import pymongo
    
    monclient = pymongo.MongoClient("mongodb://user:pass:port/")
    db = monclient["col"]
    
    cursor = db['col'].watch()
    
    while True:
        doc = next(cursor)
        print(doc)
    

    然后在监控表中插入一条数据后,可以看到下图:

    插入数据.png
    这里在程序的运行过程中也说明了一点,监控过程中cursor.hasNext()这个方法是阻塞等待新的消息进来的,可以放心使用

    继续做测试,接下来是更新消息:


    更新数据.png

    很好,基本都可以做到实时响应,上面这两图可以配合下面的官方文档中的Document基本格式食用。

    {
       _id : { <BSON Object> },
       "operationType" : "<operation>",
       "fullDocument" : { <document> },
       "ns" : {
          "db" : "<database>",
          "coll" : "<collection"
       },
       "to" : {
          "db" : "<database>",
          "coll" : "<collection"
       },
       "documentKey" : { "_id" : <value> },
       "updateDescription" : {
          "updatedFields" : { <document> },
          "removedFields" : [ "<field>", ... ]
       }
       "clusterTime" : <Timestamp>,
       "txnNumber" : <NumberLong>,
       "lsid" : {
          "id" : <UUID>,
          "uid" : <BinData>
       }
    }
    

    在Java的API中可以看到,是有一些变形的,但是不影响使用,还是可以很轻松分辨出来API中对应的字段的。
    接下来是一个我比较关心的特性,就是如何进行断点续传
    程序的关闭启动是很正常的需求和操作,在程序关闭后如何在再次启动后可以直接延续上次进度而不会丢失这段时间内的数据就是一个非常重要的问题了。在官方文档中提到了resumeToken来解决这一场景。
    简单的说,resumeToken是一个记录点,相当于游戏中的存档,我们可以从resumeToken的位置重新开始消费日志。

    Use this document as a resumeToken for the resumeAfter parameter when resuming a change stream.

                ChangeStreamDocument<Document> stream = cursor.next();
                resumeToken = stream.getDocumentKey();
                resumeToken = stream.getResumeToken();
    

    可以看到,stream提供了一个获得resumeToken的方法……对JAVA的API中的getResumeToken()获得的resumeToken进行观察,发现有以下特点:

    1. 并不会随着doc的变化而发生变化,而是一直是一个固定值
    2. 也可以从断掉的那一刻获得数据
    3. 时间长了会失效
    关闭程序后重启接收到了关闭期间写入的消息

    核心代码

            MongoCursor<ChangeStreamDocument<Document>> cursor = null;
    
            MongoCollection<Document> collection = originMongoDao.getMongoCollection();
    
            BsonDocument resumeToken = targetMongoDao.getResumeToken();
            if (resumeToken != null){
                cursor = collection.watch().resumeAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
            }else {
                cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
            }
    
            long time1 = 0l;
            long time2 = 0l;
    
            int insert = 0;
            int update = 0;
            int delete = 0;
            int others = 0;
    
    
            AtomicInteger count = new AtomicInteger(0);
    
            while (cursor.hasNext()){
                ChangeStreamDocument<Document> stream = cursor.next();
                resumeToken = stream.getDocumentKey();
    
                Document doc = stream.getFullDocument();
    
                MtxxFeature mtxxFeature = new MtxxFeature();
                switch (stream.getOperationType())
                {
                    case
                            INSERT:insert += 1;
                            mtxxFeature.setMongo__delete(false);
                    break;
                    case
                            UPDATE:update += 1;
                            mtxxFeature.setMongo__delete(false);
                    break;
                    case
                            DELETE:delete += 1;
                            mtxxFeature.setMongo__delete(true);
                    break;
                    default: others += 1;
                             mtxxFeature.setMongo__delete(false);
                    break;
                }
    
          
                log.info("id:{} end [insert:{} update:{} delete:{} others:{}]", mtxxFeature.getMongo_id(), insert, update, delete, others);
    
    
            }
        }
    

    相关文章

      网友评论

        本文标题:Mongo新机制ChangeStream解决数据同步迁移的问题

        本文链接:https://www.haomeiwen.com/subject/yealxctx.html