美文网首页
ocr迁移测试总结

ocr迁移测试总结

作者: 海的那一边 | 来源:发表于2019-04-02 17:05 被阅读0次

随着业务的增加,数据库中表(集合)太多不便于管理等问题,会出现数据库表迁移的需求,即将老数据库中的一些表迁移到新数据库中。此次测试的迁移就是这种类型的数据迁移。

  • 验证点:
    1.验证迁移前后的新旧表中的数据是否一致:包括总量是否一致、表中每条数据的内容是否一致。
    2.切换服务中的数据库表后,数据在新表中读写正常,相关业务不会再使用老表。
  • 迁移及过程:


    ocr数据迁移.png
  • 数据比较脚本:
    1.连接数据库
client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')  ## pub
client.platform.authenticate("username", "password")  # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]
table3 = 'ocr_universal_imgs'
table_old = db[table3]
table_new = db2[table3]
cmp_table(table_old, table_new, "_id", table3)

2.获取updatetime大于某个时间的数据,返回的是一个游标cursor

med_table.find({"updatetime": {"$gt": datetime.datetime(2019, 3, 28, 10, 11, 0, 508000)}})   

3.根据某个域的值获取一条数据

after_str = med_table2.find_one({unique_id: uid})  # 使用唯一标识在新数据库表中查找对应的数据    
  • 测试环境增量数据模拟
    由于测试环境数据量相对较小,在测试增量的时候需要模拟增量数据(新增和更新的数据),可以使用脚本执行。

  • 思考:
    1.注意打印运行到第几条数据的信息,因为数据很大时,会耗时很久,打印了过程数据,能知道验证的进度。
    2.提前指定计划,考虑周全,好的计划可以很好的指引测试行动。
    3.提前预估每个阶段的执行时间,对整个迁移时间有大致的掌握。

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import pymongo
import logging
import datetime
logging.basicConfig(level=logging.DEBUG,  # 控制台打印的日志级别
                                    filename='ocrMigrationLog.log',
                                    filemode='a',  # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
                                    # a是追加模式,默认如果不写的话,就是追加模式
                                    format=
                                    '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                                    # 日志格式
                                    )

dictTable = ['xxx', 'xxxx']
table3 = 'xxxxx'


# 连接数据库platform2迁移前

# client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')  # qa
# client.platform.authenticate("username", "password")  # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')  ## pub
client.platform.authenticate("username", "password")  # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]

# 连接数据库ocr迁移后

# client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')  # qa
# client2.ocr.authenticate("username", "password")
client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')  # pub
client2.ocr.authenticate("username", "password")
db2 = client2['ocr']


def cmp_table(med_table, med_table2, unique_id, table_name):
    print table_name + "的迁移测试结果如下:"
    logging.info(table_name + "的迁移测试结果如下:")
    count_fail = 0
    count_success = 0
    count_fail_cannot_find = 0
    count = 0

    before_count = med_table.find().count()
    print "老数据库platform2的表数据个数:" + str(before_count)
    logging.info("老数据库platform2的表数据个数:" + str(before_count))
    after_count = med_table2.find().count()
    print "新数据库ocr的表数据个数:" + str(after_count)
    logging.info("新数据库ocr的表数据个数:" + str(after_count))

    if before_count == after_count:
        print '老数据库表和新数据库表个数相同'
        logging.info('老数据库表和新数据库表个数相同')
    else:
        print '老数据库表和新数据库表个数不同' + "\n"
        logging.info('老数据库表和新数据库表个数不同' + "\n")

    # 遍历老数据库表的每条数据
    # for i in med_table.find():
    for i in med_table.find({"updatetime": {"$gt": datetime.datetime(2019, 3, 28, 10, 11, 0, 508000)}}):
        count += 1
        print count

        # 每隔几个数比较一次"start_time": "2019-03-28T10:11:38.844Z"
        if count % 1 == 0:
            before_str = i
            temp = unique_id
            try:
                uid = i[temp]  # 获取每条数据的唯一标识
            except:
                print "unique_id未获取到,第" + str(count) +"条"
                logging.error("unique_id未获取到,第" + str(count) +"条")

            else:
                after_str = med_table2.find_one({unique_id: uid})  # 使用唯一标识在新数据库表中查找对应的数据
                if after_str:
                    result = cmp(before_str, after_str)  # 比较新老数据是否正确
                    type(result)
                    if result is not 0:
                        count_fail += 1
                        print "新表老表数据值不匹配的uid: " + str(uid)
                        logging.error("新表老表数据值不匹配的uid: " + str(uid))
                        print i
                        print after_str

                    else:
                        count_success += 1
                else:

                    print "在新表中找不到的uid: " + str(uid)
                    logging.error("在新表中找不到的uid: " + str(uid))
                    count_fail_cannot_find += 1

    print 'count_fail:' + str(count_fail)
    logging.info('count_fail:' + str(count_fail))
    print 'count_fail_cannot_find:' + str(count_fail_cannot_find)
    logging.info('count_fail_cannot_find:' + str(count_fail_cannot_find))
    print 'count_success:' + str(count_success) + "\n"
    logging.info('count_success:' + str(count_success) + "\n")


#开始对表进行比对
for dict_table in dictTable:

    table_old = db[dict_table]
    table_new = db2[dict_table]
    cmp_table(table_old, table_new, "_id", dict_table)

table_old = db[table3]
table_new = db2[table3]
cmp_table(table_old, table_new, "_id", table3)

增量数据模拟:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import uuid
from Carbon.Aliases import false, true
import pymongo
import logging
from dateutil import parser


logging.basicConfig(level=logging.DEBUG,  # 控制台打印的日志级别
                    filename='ocrMigrationUpdateLog.log',
                    filemode='a',  # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
                    # a是追加模式,默认如果不写的话,就是追加模式
                    format=
                    '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                    # 日志格式
                    )

table1 = 'xxx'
table2 = 'xxxx
table3 = 'xxxxx'

# 连接数据库platform2迁移前

client = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')
client.platform.authenticate("username", "password")  # 注意这里是对具体的数据库进行鉴权,platform是数据库的名字
db = client["platform"]

# 连接数据库ocr迁移后

client2 = pymongo.MongoClient('mongodb://xx.xx.xx.xx:27017/')
client2.ocr.authenticate("username", "password")
db2 = client2['ocr']


def update_table(med_table, unique_id, table_name, updatename):
    dateStr = '2019-06-12T00:00:00.000Z'
    myDatetime = parser.parse(dateStr)
    print table_name + "的迁移测试结果如下:"
    logging.info(table_name + "的迁移测试结果如下:")
    count = 0

    # 遍历老数据库表的每条数据
    for i in med_table.find().limit(1):
        temp = unique_id
        try:
            uid = i[temp]  # 获取每条数据的唯一标识
            print uid


        except:
            print "unique_id未获取到,第" + str(count) + "条"
        update_result = med_table.update({unique_id: uid}, {"$set": {updatename: "test19", "updatetime": myDatetime}})
        logging.info("update表" + table_name + "结果:" + str(update_result))


def insert_table1(med_table):
    dateStr = '2019-06-12T00:00:00.000Z'
    myDatetime = parser.parse(dateStr)
    uid = str(uuid.uuid1())
    insert_result1 = med_table.insert({"uid": uid,  "updatetime": myDatetime, "createtime": myDatetime,
                            "status": 1, "complete": true,
                            "valid": true, "reported": false})
    print insert_result1
    logging.info("insert表'xxx'" + "结果:" + str(insert_result1))


def insert_table2(med_table):
    from dateutil import parser
    dateStr = '2019-06-12T00:00:00.000Z'
    myDatetime = parser.parse(dateStr)
    uid = str(uuid.uuid1())
    print uid
    insert_result2 = med_table.insert([{"uid": uid, "record_id": "112383", "type": "sh4", "updatetime": myDatetime,
                             "createtime": myDatetime,
                             "status": 2, "__v": 0, 
                             "reason": ""}])
    print insert_result2
    logging.info("insert表xxxx" + "结果:" + str(insert_result2))


def insert_table3(med_table):
    from dateutil import parser
    dateStr = '2019-06-12T00:00:00.000Z'
    myDatetime = parser.parse(dateStr)
    uid = str(uuid.uuid1())
    print uid
    insert_result3 = med_table.insert({ "__v": 0,
                            "label": "上传图片",
                            "type": "test12",
                            "ignore": false,
                            "updatetime": myDatetime,
                            "createtime": myDatetime,
                            "status": 0,
                            "valid": true})
    print insert_result3
    logging.info("insert表xxxxx" + "结果:" + str(insert_result3))


# 开始update三个集合中的部分数据

table_old = db[table1]
update_table(table_old, "uid", table1, "project_id")
insert_table1(db[table1])

table_old = db[table2]
update_table(table_old, "uid", table2, "name")
insert_table2(table_old)

table_old = db[table3]
update_table(table_old,  "_id", table3, "type")
insert_table3(table_old)

相关文章

  • ocr迁移测试总结

    随着业务的增加,数据库中表(集合)太多不便于管理等问题,会出现数据库表迁移的需求,即将老数据库中的一些表迁移到新数...

  • 迁移测试总结

    本迭代由于新增了表单的状态,由以前的未完成、已完成、已通过审核变成现在的未完成、已完成、已通过审核、已完结4种状态...

  • 机器学习之识别验证码

    下载 tesseract_ocr 使用composer下载,tesseract_ocr ocr 测试 训练样本数据...

  • Confluence迁移

    测试wiki迁移 @(wiki迁移测试) 环境 wiki测试环境:172.16.100.71 Centos 6.5...

  • 2017.7.10 OCR

    祥云OCR 服务 效果不错 少量购买 5分/次 LeadTools OCR SDK 3W多,尚未测试效果

  • OCR测试方案

    一、硬件配置 IFIE919.0版本CDR硬件扫描 二、前期准备工作 1、多个业务线程同时启动,硬件开...

  • 迁移MySQL

    迁移(MySQL 5.6.44 --> 5.7.26) 搭建MySQL 5.6.44 测试环境 迁移5.6数据到5...

  • 文本对比算法的简单实现

    我们小组当前有需要要测试ocr场景文字识别的效果,给定真实文本和ocr模型预测的文本,希望给出评价指标验证模型的效...

  • android的ocr功能支持,主要用tess-two做图片文字

    操作流程如下: https://github.com/rmtheis/android-ocr 作者写的测试app,...

  • 百度 OCR 文字识别的接口测试

    OCR(Optical Character Recognition,光学字符识别),用于识别图片中的文字。本文测试...

网友评论

      本文标题:ocr迁移测试总结

      本文链接:https://www.haomeiwen.com/subject/irgcbqtx.html