美文网首页编程实践
Mongodb实现应用级集合数据同步(Nodejs版)

Mongodb实现应用级集合数据同步(Nodejs版)

作者: 全栈顾问 | 来源:发表于2020-12-21 11:58 被阅读0次

tms-mongodb-web(MongoDB的Web客户端,可对数据库进行可视化管理)项目中,通过replica set实现应用级集合复制能力。本文介绍了项目中使用的一些关键方法:用docker搭建MongoDB复制集;使用ChangeStream监听集合更新;使用子进程执行实时复制;MongoDB命令行执行js脚本等。

项目地址:https://github.com/jasony62/tms-mongodb-web

目标

允许用户在系统运行时建立或删除集合间的复制关系,基于这个关系,用户可手动执行全量复制(通过API),系统可自动执行实时增量复制(独立线程)。典型应用场景包括:1、将多个分散集合中的数据集中到同一个集合中,便于实现全局查找和处理;2、跟踪集合的数据变化,用异步线程对数据进行加工,减少单段代码的复杂度,例如:记录业务级日志。

本文主要介绍了多集合数据汇聚场景的实现方法。

MongoDB复制集

MongoDB自带replica set功能,用于实现多个MongoDB实例间数据的实时复制,从而实现数据备份和高可用。

复制集中的实例分为3种类型:Primary(主)Secondaries(从)Arbiter(仲裁)。客户端所有写操作都要通过Primary节点,默认情况下读操作也都通过主节点(客户端可以设置为支持多读);Secondaries节点从主节点实时接收变更数据;Arbiter节点不复制数据也不能成为主节点,只是参与投票。复制集最小配置通常采用一主两从的结构(PSS),但是如果资源有限,可以采取一主一从一仲裁的结构(PSA)。主节点执行写操作时会生成oplog,从节点通过执行oplog复制数据。当主节点不可用时,从节点可以通过选举机制成为从主节点。

建立复制集

下面说明如何在本地开发环境中通过docker建立复制集,从而简化开发过程。

开发环境只是为了进行功能验证,所以可以将复制集设置为PSA结构,这样占用的资源量最小。配置复制集前,需要先启动3个MongoDB的实例,我们通过docker-compose执行(下述命令行用于示例,实际执行请参照项目中的说明文档)。

docker-compose up mongodb mongodb-s mongodb-a

在docker-compose文件中需要将MongoDB的启动方式设置为复制集方式

entrypoint: [ "mongod", "--bind_ip_all", "--port", "37017", "--replSet", "tmw-rs" ]

为实现复制集的自动初始化,新建init_replicate.js文件,内容如下:

 rs.initiate({
  _id: 'tmw-rs', // 复制集名称
  members: [
    {
      _id: 0,
      host: 'host.docker.internal:37017', // 用host.docker.internal实现容器外部可以访问
      priority: 1,
    },
    {
      _id: 1,
      host: 'host.docker.internal:37018',
      priority: 0, // 不能作为主节点
    },
    {
      _id: 2,
      host: 'host.docker.internal:37019',
      arbiterOnly: true, // 作为仲裁节点
    },
  ],
})

注意:第二个节点设置priority: 0,使从节点不能升级主节点,保证每次重启后主节点都是固定的,便于调试。第三个节点设置为仲裁节点。

为了能够在容器外连接复制集,需要在主机上的hosts文件中添加host.docker.internal

新建文件init_replicate.sh,执行复制集初始化:

#!/bin/sh

# 复制配置文件到容器中
docker cp ./init_replicate.js tms-mongodb-mongo:/home/int_replicate.js

# 按照配置文件初始化复制集
docker exec -it tms-mongodb-mongo mongo --port 37017 /home/int_replicate.js

/bin/sh init_replicate.sh

测试复制集

连接主节点

mongo --port 37017

在主节点中添加数据

devrs:PRIMARY> db.cl01.insertOne({x: 1})

连接从节点查看数据

mongo --port 37018

允许从节点读

rs.secondaryOk()

查看是否已有复制的数据

devrs:SECONDARY> db.cl01.find()

连接到仲裁节点

show dbs

可以看到里面只有local数据库,并没有同步的数据库db01

客户端连接

MongoClient.connect('mongodb://host.docker.internal:37017,host.docker.internal:37018,host.docker.internal:37019/?replicaSet=tmw-rs')

监听集合变化

MongoDB 3.6版本开始提供ChangeStream,它可以订阅单个集合,单个数据库或者整个部署实例的数据变化。

const cl = mongoClient.db('tms_admin').collection('replica_map')
ReplicaMapWatcher = cl.watch([], { fullDocument: 'updateLookup' })

上面的代码是订阅集合tms_admin.replica_map的数据变化,集合的watch方法创建ChangeStream实例。

ReplicaMapWatcher.on('change', async (csEvent) => {
  ......
})

当数据发生变化时,会产生如下事件:

{
  operationType: 'insert',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960043 },
  fullDocument: { _id: 5fc843abdf2330d645686e0f, x: 5 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc843abdf2330d645686e0f }
}
{
  operationType: 'delete',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960379 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc843abdf2330d645686e0f }
}

注意deleteMany方法会产生多个单条事件。

{
  operationType: 'update',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960612 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc84245df2330d645686e0c },
  updateDescription: { updatedFields: [Object], removedFields: [] }
}

创建ChangeStream时可以设置选项fullDocument参数,这样在update事件中就可以返回document的完整信息。

{
  operationType: 'replace',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960924 },
  fullDocument: { _id: 5fc84245df2330d645686e0c, x: 9 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc84245df2330d645686e0c }
}

实现

记录数据复制关系

首先,建立集合tms_admin.replica_map记录复制关系。用户可以自己建立或删除集合间的复制关系。系统利用ChangeStream订阅tms_admin.replica_map的变化情况,自动建立或关闭集合间的实时复制。

字段 说明 类型 必填
_id mongodb内部id ObjectId
primary 主集合。 object
primary.db 主集合数据库名称。 string
primary.cl 主集合名称。 string
secondary 从集合。 object
secondary.db 从集合数据库名称。 string
secondary.cl 从集合名称。 string

进行数据复制时,为了追踪数据来源,系统在从集合中添加记录数据来源的字段__pri

内置字段 说明 必填 类型
__pri object
__pri.db 文档所属数据库名称(sysname)。 string
__pri.cl 文档所属集合名称(sysname)。 string
__pri.id 文档原始 id(_id)。 ObjectId
__pri.time 最近一次复制时间。 13 位整型

全量复制

因为在从集合中记录了文档来源__pri,复制时,先检查主集合的数据是否已经存在,存在就替换,否则插入新数据。

secSysCl.replaceOne({ '__pri.id': _id }, doc, { upsert: true })

从集合数据来源包含了文档最后1次更新的时间戳time,更新时间早于这个时间戳的文档,是主集合中已经删除的文档,应该在从集合中删除。

secSysCl
      .deleteMany({
        '__pri.db': pri.db,
        '__pri.cl': pri.cl,
        '__pri.time': { $not: { $eq: syncAt } },
      })

代码:/back/models/mgdb/replicaMap.js

实时复制

Nodejs是单线程的,虽然它的执行效率很高,但是如果有频繁的数据更新操作要处理,多少还是会影响处理效率,因此使用child_process将实时复制放到独立的线程中执行。

实时复制功能要求MongoDB必须部署为复制集方式,TMW设置了环境变量TMW_REALTIME_REPLICA,控制是否支持实时复制功能。TMW启动时,检查环境变量TMW_REALTIME_REPLICA是否有效,如果有就启动实时复制子线程。

const cp = require('child_process')
Replica_Child_Process = cp.spawn('node', ['./replica/watch.js'], {
  detached: true,
  stdio: 'ignore',
})
Replica_Child_Process.unref()

这里设置为子线程独立于主线程的方式,复制线程中的日志无法发送的主线程,因此,为了便于查看日志输出,应该通过配置文件/back/config/log4js.js将日志输出到文件。

module.exports = {
  appenders: {
    consoleout: { type: 'console' },
    fileout: {
      type: 'file',
      filename: './logs/back-logs.log',
      maxLogSize: 1024 * 1024,
    },
  },
  categories: {
    default: {
      appenders: ['consoleout', 'fileout'],
      level: process.env.TMS_APP_LOG4JS_LEVEL || 'debug',
    },
  },
}

代码:/back/replica/watch.js

命令行脚本复制

可以通过命令行在程序之外执行对MongoDB的操作,例如:初始化,生成测试数据等。

项目中自带了执行全量复制的js脚本,便于系统已经初始化操作。

mongo --port 37017 ./back/replica/synchronize.js

代码:/back/replica/synchronize.js

参考

https://docs.mongodb.com/manual/changeStreams/

相关文章

网友评论

    本文标题:Mongodb实现应用级集合数据同步(Nodejs版)

    本文链接:https://www.haomeiwen.com/subject/aiwhwktx.html