随着业务的增加,数据库中表(集合)太多不便于管理等问题,会出现数据库表迁移的需求,即将老数据库中的一些表迁移到新数据库中。此次测试的迁移就是这种类型的数据迁移。
- 验证点:
1.验证迁移前后的新旧表中的数据是否一致:包括总量是否一致、表中每条数据的内容是否一致。
2.切换服务中的数据库表后,数据在新表中读写正常,相关业务不会再使用老表。 -
迁移及过程:
ocr数据迁移.png - 数据比较脚本:
1.连接数据库
client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/') ## pub
client.platform.authenticate("username", "password") # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]
table3 = 'ocr_universal_imgs'
table_old = db[table3]
table_new = db2[table3]
cmp_table(table_old, table_new, "_id", table3)
2.获取updatetime大于某个时间的数据,返回的是一个游标cursor
med_table.find({"updatetime": {"$gt": datetime.datetime(2019, 3, 28, 10, 11, 0, 508000)}})
3.根据某个域的值获取一条数据
after_str = med_table2.find_one({unique_id: uid}) # 使用唯一标识在新数据库表中查找对应的数据
-
测试环境增量数据模拟
由于测试环境数据量相对较小,在测试增量的时候需要模拟增量数据(新增和更新的数据),可以使用脚本执行。 -
思考:
1.注意打印运行到第几条数据的信息,因为数据很大时,会耗时很久,打印了过程数据,能知道验证的进度。
2.提前指定计划,考虑周全,好的计划可以很好的指引测试行动。
3.提前预估每个阶段的执行时间,对整个迁移时间有大致的掌握。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pymongo
import logging
import datetime
logging.basicConfig(level=logging.DEBUG, # 控制台打印的日志级别
filename='ocrMigrationLog.log',
filemode='a', # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
# a是追加模式,默认如果不写的话,就是追加模式
format=
'%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
# 日志格式
)
dictTable = ['xxx', 'xxxx']
table3 = 'xxxxx'
# 连接数据库platform2迁移前
# client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/') # qa
# client.platform.authenticate("username", "password") # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/') ## pub
client.platform.authenticate("username", "password") # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]
# 连接数据库ocr迁移后
# client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/') # qa
# client2.ocr.authenticate("username", "password")
client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/') # pub
client2.ocr.authenticate("username", "password")
db2 = client2['ocr']
def cmp_table(med_table, med_table2, unique_id, table_name):
print table_name + "的迁移测试结果如下:"
logging.info(table_name + "的迁移测试结果如下:")
count_fail = 0
count_success = 0
count_fail_cannot_find = 0
count = 0
before_count = med_table.find().count()
print "老数据库platform2的表数据个数:" + str(before_count)
logging.info("老数据库platform2的表数据个数:" + str(before_count))
after_count = med_table2.find().count()
print "新数据库ocr的表数据个数:" + str(after_count)
logging.info("新数据库ocr的表数据个数:" + str(after_count))
if before_count == after_count:
print '老数据库表和新数据库表个数相同'
logging.info('老数据库表和新数据库表个数相同')
else:
print '老数据库表和新数据库表个数不同' + "\n"
logging.info('老数据库表和新数据库表个数不同' + "\n")
# 遍历老数据库表的每条数据
# for i in med_table.find():
for i in med_table.find({"updatetime": {"$gt": datetime.datetime(2019, 3, 28, 10, 11, 0, 508000)}}):
count += 1
print count
# 每隔几个数比较一次"start_time": "2019-03-28T10:11:38.844Z"
if count % 1 == 0:
before_str = i
temp = unique_id
try:
uid = i[temp] # 获取每条数据的唯一标识
except:
print "unique_id未获取到,第" + str(count) +"条"
logging.error("unique_id未获取到,第" + str(count) +"条")
else:
after_str = med_table2.find_one({unique_id: uid}) # 使用唯一标识在新数据库表中查找对应的数据
if after_str:
result = cmp(before_str, after_str) # 比较新老数据是否正确
type(result)
if result is not 0:
count_fail += 1
print "新表老表数据值不匹配的uid: " + str(uid)
logging.error("新表老表数据值不匹配的uid: " + str(uid))
print i
print after_str
else:
count_success += 1
else:
print "在新表中找不到的uid: " + str(uid)
logging.error("在新表中找不到的uid: " + str(uid))
count_fail_cannot_find += 1
print 'count_fail:' + str(count_fail)
logging.info('count_fail:' + str(count_fail))
print 'count_fail_cannot_find:' + str(count_fail_cannot_find)
logging.info('count_fail_cannot_find:' + str(count_fail_cannot_find))
print 'count_success:' + str(count_success) + "\n"
logging.info('count_success:' + str(count_success) + "\n")
#开始对表进行比对
for dict_table in dictTable:
table_old = db[dict_table]
table_new = db2[dict_table]
cmp_table(table_old, table_new, "_id", dict_table)
table_old = db[table3]
table_new = db2[table3]
cmp_table(table_old, table_new, "_id", table3)
增量数据模拟:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import uuid
from Carbon.Aliases import false, true
import pymongo
import logging
from dateutil import parser
logging.basicConfig(level=logging.DEBUG, # 控制台打印的日志级别
filename='ocrMigrationUpdateLog.log',
filemode='a', # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
# a是追加模式,默认如果不写的话,就是追加模式
format=
'%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
# 日志格式
)
table1 = 'xxx'
table2 = 'xxxx
table3 = 'xxxxx'
# 连接数据库platform2迁移前
client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')
client.platform.authenticate("username", "password") # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]
# 连接数据库ocr迁移后
client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')
client2.ocr.authenticate("username", "password")
db2 = client2['ocr']
def update_table(med_table, unique_id, table_name, updatename):
dateStr = '2019-06-12T00:00:00.000Z'
myDatetime = parser.parse(dateStr)
print table_name + "的迁移测试结果如下:"
logging.info(table_name + "的迁移测试结果如下:")
count = 0
# 遍历老数据库表的每条数据
for i in med_table.find().limit(1):
temp = unique_id
try:
uid = i[temp] # 获取每条数据的唯一标识
print uid
except:
print "unique_id未获取到,第" + str(count) + "条"
update_result = med_table.update({unique_id: uid}, {"$set": {updatename: "test19", "updatetime": myDatetime}})
logging.info("update表" + table_name + "结果:" + str(update_result))
def insert_table1(med_table):
dateStr = '2019-06-12T00:00:00.000Z'
myDatetime = parser.parse(dateStr)
uid = str(uuid.uuid1())
insert_result1 = med_table.insert({"uid": uid, "updatetime": myDatetime, "createtime": myDatetime,
"status": 1, "complete": true,
"valid": true, "reported": false})
print insert_result1
logging.info("insert表'xxx'" + "结果:" + str(insert_result1))
def insert_table2(med_table):
from dateutil import parser
dateStr = '2019-06-12T00:00:00.000Z'
myDatetime = parser.parse(dateStr)
uid = str(uuid.uuid1())
print uid
insert_result2 = med_table.insert([{"uid": uid, "record_id": "112383", "type": "sh4", "updatetime": myDatetime,
"createtime": myDatetime,
"status": 2, "__v": 0,
"reason": ""}])
print insert_result2
logging.info("insert表xxxx" + "结果:" + str(insert_result2))
def insert_table3(med_table):
from dateutil import parser
dateStr = '2019-06-12T00:00:00.000Z'
myDatetime = parser.parse(dateStr)
uid = str(uuid.uuid1())
print uid
insert_result3 = med_table.insert({ "__v": 0,
"label": "上传图片",
"type": "test12",
"ignore": false,
"updatetime": myDatetime,
"createtime": myDatetime,
"status": 0,
"valid": true})
print insert_result3
logging.info("insert表xxxxx" + "结果:" + str(insert_result3))
# 开始update三个集合中的部分数据
table_old = db[table1]
update_table(table_old, "uid", table1, "project_id")
insert_table1(db[table1])
table_old = db[table2]
update_table(table_old, "uid", table2, "name")
insert_table2(table_old)
table_old = db[table3]
update_table(table_old, "_id", table3, "type")
insert_table3(table_old)
网友评论