python接口自动化框架整理

作者: 索尔_de84 | 来源:发表于2019-01-30 23:17 被阅读349次

    这个接口自动化框架是基于unittest单元测试框架来写的,简单,还算可以用吧,也有些不合理的地方,欢迎大神指正呀!

    接口自动化框架布局

    image

    这个框架一共分为6个部分:

    common:公共代码模块,包括一些公共的代码和测试用例的代码模块

    conf:存放配置文件的模块,包括数据库配置信息,项目ip地址,框架所在路径,邮件的配置等

    logs:存放用例执行过程中所生成的日志信息

    test_data:存放测试数据(这里用的excel)

    test_report:用于存放生成的测试报告

    main执行文件:用于执行测试,并在测试完成后生成测试报告和发送邮件

    公共代码

    先看下公共代码的构成

    发起http请求的类:

    image

    这里我调用的是requests模块,可发起post请求或get请求,根据传入的method来进行判断,然后我这里直接返回json格式的数据,这个便于之后对数据的一些处理

    读取配置文件的类:

    image

    这个类我调用的configparser库,主要就是用来读取conf目录中的配置文件,这样的话就可以实现数据的分离,数据库配置信息,ip等配置就不用在代码中写死,可实现可配置化,以便于进行修改

    读写execl的类:

    '''
        #用于读取测试数据,写出测试结果的类
        from common.regular_pick import Regular
        from openpyxl import load_workbook
        from common.config import Config
        from common.loger import Log
        from conf.projectpath import *
        #实例化正则表达式提取器
        reg=Regular()
        #日志模块实例化
        log=Log()
        class DoExcel():
          def __init__(self,workbook_name,sheet_name):
                self.workbook_name=workbook_name
                self.sheet_name=sheet_name
                self.init_sheet_name='init'
                self.ip=Config().configer(http_path,'HTTP','ip')
      #将测试的结果回写写入到excel中
        def write_excel(self,row,result_data):
            #打开工作薄
              wb=load_workbook(self.workbook_name)
            #打开表单
              sh_new=wb[self.sheet_name]
              sh_new.cell(row=row+1,column=10).value=result_data['actually_code']
              sh_new.cell(row=row+1,column=11).value=result_data['sql_result']
              sh_new.cell(row=row+1,column=12).value=result_data['result']
              sh_new.cell(row=row+1,column=13).value=result_data['reson']
              wb.save(self.workbook_name)
       #读取初始化手机号
        def read_initphone(self):
            wb=load_workbook(self.workbook_name)
            sh_init=wb[self.init_sheet_name]
            init_phone=sh_init.cell(row=2,column=1).value
            return init_phone
        #将注册成功的手机号单独记录
        def re_ok_tel(self,reg_tel):
              wb=load_workbook(self.workbook_name)
            sh_new=wb[self.init_sheet_name]
          #获取最大行数,每次都追加写入
            sh_new.cell(row=sh_new.max_row+1,column=2).value=reg_tel
            wb.save(self.workbook_name)
        #在初始化手机号使用后,对它进行自动修改
        def update_initphone(self,new_phone):
            wb=load_workbook(self.workbook_name)
            sh_new=wb[self.init_sheet_name]
            sh_new.cell(row=2,column=1).value=new_phone
            wb.save(self.workbook_name)
        #将测试数据读取出来
         def read_excel(self,mod,case_list):
            init_phone=self.read_initphone()
            #打开工作薄
            wb=load_workbook(self.workbook_name)
            #打开excel表单
            sh_new=wb[self.sheet_name]
            #新建列表,用来存入读取的字典
            data=[]
            #获取最大行数,并进行遍历表单,先把所有数据读出来,然后再进行判断是否要读出所有用例
            for i in range(1,sh_new.max_row):
                #新建字典,用来读取存入的数据
                dict_data={}
                #根据键名来读取,与excel测试数据的名称一致
                dict_data['id']=sh_new.cell(row=i+1,column=1).value
                dict_data['module']=sh_new.cell(row=i+1,column=2).value
                dict_data['method']=sh_new.cell(row=i+1,column=3).value
                dict_data['url']=self.ip+sh_new.cell(row=i+1,column=4).value
                dict_data['param']=sh_new.cell(row=i+1,column=5).value
                #如果号码为phone则进行替换为初始手机号
                phone=dict_data['param']
                dict_data['param']=reg.regular('initphone',str(init_phone),phone)
                dict_data['title']=sh_new.cell(row=i+1,column=6).value
                dict_data['sql_mode']=sh_new.cell(row=i+1,column=7).value
                #if sh_new.cell(row=i+1,column=8).value !=None:
                    #将sql中涉及到手机号参数的进行替换
                dict_data['sql']=sh_new.cell(row=i+1,column=8).value
                sql=dict_data['sql']
                dict_data['sql']=reg.regular('initphone',str(init_phone),sql)
                dict_data['excepted_code']=sh_new.cell(row=i+1,column=9).value
                data.append(dict_data)
            #根据mod来判断,如果mod为1则读取所有的数据,不为1则根据case_list来判断
            if mod==1:
               final_data=data
                log.info('读取的测试数据为{0}'.format(final_data))
            else:
                final_data=[]
                for item in data:
                    if item['id'] in case_list:
                        final_data.append(item)
                        log.info('读取的测试数据为{0}'.format(final_data))
            #self.update_initphone(int(init_phone)+1)
            return final_data
           '''
    

    这个类主要用来读写excel中的数据,因为我把测试数据都放在excel中的,然后在发起http请求之前,就调用的的方法将测试数据读取出来,进行发起请求,然后在测试完成后将结果回写到excel中;这样做的目的是,对于想要测试的接口,可以直接在excel中进行添加和管理,就不用单独再去改代码加测试用例了,比较的方便吧。
    就是这个样子的:


    image.png

    进行单元测试的类:

    '''
      #写测试用例的类
      #首先引入单元测试框架,继承使用TestCase编写测试用例
      #然后进行初始化处理,发起http请求,
      #进行断言,并将测试结果写入到excel
      import unittest
      #引入ddt装饰器,使用数据驱动模式直接从excel中读取测试用例
      from ddt import ddt,data
      from common.http_request import Request
      from common.test_case_set import *
      @ddt#引入ddt
      class Test_Case(unittest.TestCase):
        #进行初始化处理
        def setUp(self):
            log.info('--------开始测试--------')
        @data(*test_data)
        #@unpack
        def test_cases(self,dict_item):#将读取的数据存为字典
            log.info('***********************')
            log.info('发起请求的测试数据为{0}'.format(dict_item))
            log.info('正在进行第{0}用例测试,用例标题为:{1}'.format(dict_item['id'],dict_item['title']))
            result_data={}
            #参数定义
            #定义check_sql
            check_sql=dict_item['sql']
            #进行参数替换
            #将手机号进行参数替换
            if s.MOBILE_PHONE !='':
                params=reg.regular('reg_tel',s.MOBILE_PHONE,dict_item['param'])
                #如果为投资接口则把手机号赋值到期望结果中
                excepted=reg.regular('reg_tel',s.MOBILE_PHONE,dict_item['excepted_code'])
            #将memberid进行参数替换
                if s.MEMBERID !='' :
                    #期望结果中的手机号
                    excepted=reg.regular('member_id',str(s.MEMBERID),excepted)
                    #投资的memberid替换
                    params=reg.regular('member_id',str(s.MEMBERID),params)
                    #投资的sql中的memberid进行替换
                    check_sql=reg.regular('member_id',str(s.MEMBERID),check_sql)
            #将充值前的余额查出来
                    if s.LEVAL_AMOUNT ==None and dict_item['module']=='recharge' :
                        s.LEVAL_AMOUNT=db.db_operation("select leaveamount from member where mobilephone="+s.MOBILE_PHONE)[0]
                        lamount=str('%.2f'%(float(s.LEVAL_AMOUNT)+float(eval(params)['amount'])))
                        excepted=reg.regular('leave_amount',lamount,excepted)
            else:
                params=dict_item['param']
                excepted=dict_item['excepted_code']
                check_sql=dict_item['sql']
            if dict_item['module']=='bidLoan':
                #将投资前的用户余额查出来
                s.STARTAMOUNT=db.db_operation(eval(check_sql)['my_sql'][0])[0]
                #将投资前标的已投总额查出来
                s.STARTINVAMOUNT=db.db_operation(eval(check_sql)['my_sql'][1])[0]
            #发起http请求
            re=Request().httprequest(dict_item['url'],eval(params),dict_item['method'])
            log.info('请求结果为{0}'.format(re))
            #注意,在将字典写入excel中时,要加format
            result_data['actually_code']=format(re)
            result_data['reson']=re['msg']
            #将需要用到的变量存储再全局变量中
            if re['msg'] =='注册成功':
                #将注册成功的手机号储存在全局变量中
                s.MOBILE_PHONE=str(eval(params)['mobilephone'])
                #将注册成功的手机号记录在excel
                t.re_ok_tel(s.MOBILE_PHONE)
                #将用户id查询出来
                member_id=db.db_operation("select Id from member where mobilephone="+s.MOBILE_PHONE)[0]
                s.MEMBERID=member_id
            if re['msg']=='充值成功':
                #将注册时间进行替换
                reg_time=re['data']['regtime']
                excepted=reg.regular('reg_time',reg_time,excepted)
            if re['msg']=='竞标成功':
                #获取投资成功后的用户余额
                s.ENDAMOUNT=db.db_operation(eval(check_sql)['my_sql'][0])[0]
                #获取投资成功后的该标的投资总额
                s.ENDINVAMOUNT=db.db_operation(eval(check_sql)['my_sql'][1])[0]
            #进行数据库检查
            #通过sql_mode来判断,如果为1则检查,
            if dict_item['sql_mode']==1:
                if dict_item['module']=='bidLoan':
                    #用户实际投资金额为
                    acct_loan_amount=s.STARTAMOUNT-s.ENDAMOUNT
                    log.info('用户实际投资金额为{0}'.format(acct_loan_amount))
                    #用户实际投资记录为
                    acct_inv_amount=s.ENDINVAMOUNT-s.STARTINVAMOUNT
                    log.info('用户实际投资记录为{0}'.format(acct_inv_amount))
                    #投资后标的状态为
                    loan_status=db.db_operation(eval(check_sql)['my_sql'][2])[0]
                    log.info('用户实际投资记录为{0}'.format(loan_status))
                    #数据库期望结果
                    excepted_sql_result=eval(check_sql)['result']
                    try:
                        #用户的实际投资金额对比
                        self.assertEqual(float(excepted_sql_result[0]),float(acct_loan_amount))
                        #用户投资后的余额对比
                        self.assertEqual(float(excepted_sql_result[0]),float(acct_inv_amount))
                        #用户投资后标的状态对比
                        self.assertEqual(float(excepted_sql_result[1]),float(loan_status))
                        result_data['sql_result']='PASS'
                        log.debug('数据库检查结果为:{0}'.format(result_data['sql_result']))
                    except AssertionError as a:
                        log.debug('数据库比对报错:{0}'.format(a))
                        result_data['sql_result']='FAIL'
                        result_data['result']='FAIL'
                        t.write_excel(dict_item['id'],result_data)
                        raise a
                else:
                    #查询数据库,是否存在新增的数据,读出来是个元组类型的
                    acctually_sql=db.db_operation(eval(check_sql)['my_sql'])[0]
                    log.debug('查询数据库结果为:{0}'.format(acctually_sql))
                    #数据库期望结果
                    excepted_sql_result=eval(check_sql)['result']
                    try:
                        self.assertEqual(excepted_sql_result,acctually_sql)
                        result_data['sql_result']='PASS'
                        log.debug('数据库检查结果为:{0}'.format(result_data['sql_result']))
                    except AssertionError as a:#这里不抛出异常,以便程序继续执行
                        log.debug('数据库比对报错:{0}'.format(a))
                        result_data['sql_result']='FAIL'
                        result_data['result']='FAIL'
                        #result_data['sql_result']='FAIL'
                        #这里直接将结果写入
                        t.write_excel(dict_item['id'],result_data)
                        raise a
            else:
                result_data['sql_result']='NoNeedCheck'
    
            #进行测试结果比较断言
            try :
                self.assertEqual(eval(excepted),eval(result_data['actually_code']))
                log.info('期望结果为:{0}'.format(excepted))
                result_data['result']='PASS'
                log.debug('断言检查结果为{0}'.format(result_data['result']))
            except Exception as e:
                result_data['result']='FAIL'
                log.info('期望结果为:{0}'.format(excepted))
                log.debug('返回报错!{0}'.format(e))
                raise e
            #测试结果回写
            finally:#无论结果怎样都要将测试结果写回到excel中
                log.info('******开始写入数据******')
                t.write_excel(dict_item['id'],result_data)
                log.info('******结束写入数据******')
        def tearDown(self):
            log.info('--------结束测试--------')
    '''
    

    这个就是继承了unittest框架的TestCase类来写的测试用例,然后还用到了ddt装饰器,因为之前我使用读写excel的类,将测试数据都是以列表嵌套字典的方式读出来的,然后ddt的作用就是可以给一个用例传入不同的参数,然后每个运行一遍,这样的话就相当于运行了多个测试用例,但实际上我只写了一个test_case;这样就实现了单例模式,这也算是用excel管理测试用例的好处吧。
    利用正则表达式进行参数替换的类


    image.png

    因为在测试的时候涉及到对测试数据的参数化配置,这就会涉及到参数的替换,所以我单独写了一个类来进行参数替换,使用正则表达式的方法,调用起来比较的方便,比用replace的方法要方便一点。
    最后就是main执行文件的类:

    '''
    #用于加载并执行测试用例并生成测试报告
    import unittest
    import time
    import HTMLTestRunnerNew
    from common.test_case import Test_Case
    from common.sendemail import Sendemail
    from common.config import Config
    from conf.projectpath import *
    #添加测试集
    suite=unittest.TestSuite()
    #通过指定的测试类,加载测试用例
    loader=unittest.TestLoader()
    loader.loadTestsFromTestCase(Test_Case)
    #将用例添加到测试集
    suite.addTest(loader.loadTestsFromTestCase(Test_Case))
    #将测试结果写入到文件中
    now=time.strftime('%y-%m-%d_%H_%M_%S')
    #命名一个html文件,为了避免名称不重复所以加上时间戳
    filepath=test_report+'\API_test'+now+'.html'
    #利用html模板导出测试报告
    with open(filepath,'wb') as file:
        runner=HTMLTestRunnerNew.HTMLTestRunner(stream=file,verbosity=2,title='api_test',description=None,tester='wang')
     #执行测试用例
     runner.run(suite)
    #将测试报告通过邮件发送出去
    email_to=Config().configer(email_path,'EMAIL','email_to')
    sendmail=Sendemail()
    #sendmail.send_email(email_to,filepath)
    '''
    

    这里我先调用TestLoader来集成测试用例,然后用TestSuite来集成测试用例,然后在将测试报告用html模板导出,这样的话运行完成后就可以在web页面打开测试报告进行查看测试结果,最后还添加了发送邮件的方法,可以将测试报告发送到个人邮箱。
    我目前能做的就这些了,总的来说还是比较简易的,还有一些东西需要完善,比如部署到jenkins集成进行定时调度;代码层面也有很多地方需要优化,希望各位大神提出来;
    第一次写,排版有点乱,不太会搞,莫怪莫怪!

    相关文章

      网友评论

        本文标题:python接口自动化框架整理

        本文链接:https://www.haomeiwen.com/subject/ksdhsqtx.html