这个接口自动化框架是基于unittest单元测试框架来写的,简单,还算可以用吧,也有些不合理的地方,欢迎大神指正呀!
接口自动化框架布局
image这个框架一共分为6个部分:
common:公共代码模块,包括一些公共的代码和测试用例的代码模块
conf:存放配置文件的模块,包括数据库配置信息,项目ip地址,框架所在路径,邮件的配置等
logs:存放用例执行过程中所生成的日志信息
test_data:存放测试数据(这里用的excel)
test_report:用于存放生成的测试报告
main执行文件:用于执行测试,并在测试完成后生成测试报告和发送邮件
公共代码
先看下公共代码的构成
发起http请求的类:
image这里我调用的是requests模块,可发起post请求或get请求,根据传入的method来进行判断,然后我这里直接返回json格式的数据,这个便于之后对数据的一些处理
读取配置文件的类:
image这个类我调用的configparser库,主要就是用来读取conf目录中的配置文件,这样的话就可以实现数据的分离,数据库配置信息,ip等配置就不用在代码中写死,可实现可配置化,以便于进行修改
读写execl的类:
'''
#用于读取测试数据,写出测试结果的类
from common.regular_pick import Regular
from openpyxl import load_workbook
from common.config import Config
from common.loger import Log
from conf.projectpath import *
#实例化正则表达式提取器
reg=Regular()
#日志模块实例化
log=Log()
class DoExcel():
def __init__(self,workbook_name,sheet_name):
self.workbook_name=workbook_name
self.sheet_name=sheet_name
self.init_sheet_name='init'
self.ip=Config().configer(http_path,'HTTP','ip')
#将测试的结果回写写入到excel中
def write_excel(self,row,result_data):
#打开工作薄
wb=load_workbook(self.workbook_name)
#打开表单
sh_new=wb[self.sheet_name]
sh_new.cell(row=row+1,column=10).value=result_data['actually_code']
sh_new.cell(row=row+1,column=11).value=result_data['sql_result']
sh_new.cell(row=row+1,column=12).value=result_data['result']
sh_new.cell(row=row+1,column=13).value=result_data['reson']
wb.save(self.workbook_name)
#读取初始化手机号
def read_initphone(self):
wb=load_workbook(self.workbook_name)
sh_init=wb[self.init_sheet_name]
init_phone=sh_init.cell(row=2,column=1).value
return init_phone
#将注册成功的手机号单独记录
def re_ok_tel(self,reg_tel):
wb=load_workbook(self.workbook_name)
sh_new=wb[self.init_sheet_name]
#获取最大行数,每次都追加写入
sh_new.cell(row=sh_new.max_row+1,column=2).value=reg_tel
wb.save(self.workbook_name)
#在初始化手机号使用后,对它进行自动修改
def update_initphone(self,new_phone):
wb=load_workbook(self.workbook_name)
sh_new=wb[self.init_sheet_name]
sh_new.cell(row=2,column=1).value=new_phone
wb.save(self.workbook_name)
#将测试数据读取出来
def read_excel(self,mod,case_list):
init_phone=self.read_initphone()
#打开工作薄
wb=load_workbook(self.workbook_name)
#打开excel表单
sh_new=wb[self.sheet_name]
#新建列表,用来存入读取的字典
data=[]
#获取最大行数,并进行遍历表单,先把所有数据读出来,然后再进行判断是否要读出所有用例
for i in range(1,sh_new.max_row):
#新建字典,用来读取存入的数据
dict_data={}
#根据键名来读取,与excel测试数据的名称一致
dict_data['id']=sh_new.cell(row=i+1,column=1).value
dict_data['module']=sh_new.cell(row=i+1,column=2).value
dict_data['method']=sh_new.cell(row=i+1,column=3).value
dict_data['url']=self.ip+sh_new.cell(row=i+1,column=4).value
dict_data['param']=sh_new.cell(row=i+1,column=5).value
#如果号码为phone则进行替换为初始手机号
phone=dict_data['param']
dict_data['param']=reg.regular('initphone',str(init_phone),phone)
dict_data['title']=sh_new.cell(row=i+1,column=6).value
dict_data['sql_mode']=sh_new.cell(row=i+1,column=7).value
#if sh_new.cell(row=i+1,column=8).value !=None:
#将sql中涉及到手机号参数的进行替换
dict_data['sql']=sh_new.cell(row=i+1,column=8).value
sql=dict_data['sql']
dict_data['sql']=reg.regular('initphone',str(init_phone),sql)
dict_data['excepted_code']=sh_new.cell(row=i+1,column=9).value
data.append(dict_data)
#根据mod来判断,如果mod为1则读取所有的数据,不为1则根据case_list来判断
if mod==1:
final_data=data
log.info('读取的测试数据为{0}'.format(final_data))
else:
final_data=[]
for item in data:
if item['id'] in case_list:
final_data.append(item)
log.info('读取的测试数据为{0}'.format(final_data))
#self.update_initphone(int(init_phone)+1)
return final_data
'''
这个类主要用来读写excel中的数据,因为我把测试数据都放在excel中的,然后在发起http请求之前,就调用的的方法将测试数据读取出来,进行发起请求,然后在测试完成后将结果回写到excel中;这样做的目的是,对于想要测试的接口,可以直接在excel中进行添加和管理,就不用单独再去改代码加测试用例了,比较的方便吧。
就是这个样子的:
image.png
进行单元测试的类:
'''
#写测试用例的类
#首先引入单元测试框架,继承使用TestCase编写测试用例
#然后进行初始化处理,发起http请求,
#进行断言,并将测试结果写入到excel
import unittest
#引入ddt装饰器,使用数据驱动模式直接从excel中读取测试用例
from ddt import ddt,data
from common.http_request import Request
from common.test_case_set import *
@ddt#引入ddt
class Test_Case(unittest.TestCase):
#进行初始化处理
def setUp(self):
log.info('--------开始测试--------')
@data(*test_data)
#@unpack
def test_cases(self,dict_item):#将读取的数据存为字典
log.info('***********************')
log.info('发起请求的测试数据为{0}'.format(dict_item))
log.info('正在进行第{0}用例测试,用例标题为:{1}'.format(dict_item['id'],dict_item['title']))
result_data={}
#参数定义
#定义check_sql
check_sql=dict_item['sql']
#进行参数替换
#将手机号进行参数替换
if s.MOBILE_PHONE !='':
params=reg.regular('reg_tel',s.MOBILE_PHONE,dict_item['param'])
#如果为投资接口则把手机号赋值到期望结果中
excepted=reg.regular('reg_tel',s.MOBILE_PHONE,dict_item['excepted_code'])
#将memberid进行参数替换
if s.MEMBERID !='' :
#期望结果中的手机号
excepted=reg.regular('member_id',str(s.MEMBERID),excepted)
#投资的memberid替换
params=reg.regular('member_id',str(s.MEMBERID),params)
#投资的sql中的memberid进行替换
check_sql=reg.regular('member_id',str(s.MEMBERID),check_sql)
#将充值前的余额查出来
if s.LEVAL_AMOUNT ==None and dict_item['module']=='recharge' :
s.LEVAL_AMOUNT=db.db_operation("select leaveamount from member where mobilephone="+s.MOBILE_PHONE)[0]
lamount=str('%.2f'%(float(s.LEVAL_AMOUNT)+float(eval(params)['amount'])))
excepted=reg.regular('leave_amount',lamount,excepted)
else:
params=dict_item['param']
excepted=dict_item['excepted_code']
check_sql=dict_item['sql']
if dict_item['module']=='bidLoan':
#将投资前的用户余额查出来
s.STARTAMOUNT=db.db_operation(eval(check_sql)['my_sql'][0])[0]
#将投资前标的已投总额查出来
s.STARTINVAMOUNT=db.db_operation(eval(check_sql)['my_sql'][1])[0]
#发起http请求
re=Request().httprequest(dict_item['url'],eval(params),dict_item['method'])
log.info('请求结果为{0}'.format(re))
#注意,在将字典写入excel中时,要加format
result_data['actually_code']=format(re)
result_data['reson']=re['msg']
#将需要用到的变量存储再全局变量中
if re['msg'] =='注册成功':
#将注册成功的手机号储存在全局变量中
s.MOBILE_PHONE=str(eval(params)['mobilephone'])
#将注册成功的手机号记录在excel
t.re_ok_tel(s.MOBILE_PHONE)
#将用户id查询出来
member_id=db.db_operation("select Id from member where mobilephone="+s.MOBILE_PHONE)[0]
s.MEMBERID=member_id
if re['msg']=='充值成功':
#将注册时间进行替换
reg_time=re['data']['regtime']
excepted=reg.regular('reg_time',reg_time,excepted)
if re['msg']=='竞标成功':
#获取投资成功后的用户余额
s.ENDAMOUNT=db.db_operation(eval(check_sql)['my_sql'][0])[0]
#获取投资成功后的该标的投资总额
s.ENDINVAMOUNT=db.db_operation(eval(check_sql)['my_sql'][1])[0]
#进行数据库检查
#通过sql_mode来判断,如果为1则检查,
if dict_item['sql_mode']==1:
if dict_item['module']=='bidLoan':
#用户实际投资金额为
acct_loan_amount=s.STARTAMOUNT-s.ENDAMOUNT
log.info('用户实际投资金额为{0}'.format(acct_loan_amount))
#用户实际投资记录为
acct_inv_amount=s.ENDINVAMOUNT-s.STARTINVAMOUNT
log.info('用户实际投资记录为{0}'.format(acct_inv_amount))
#投资后标的状态为
loan_status=db.db_operation(eval(check_sql)['my_sql'][2])[0]
log.info('用户实际投资记录为{0}'.format(loan_status))
#数据库期望结果
excepted_sql_result=eval(check_sql)['result']
try:
#用户的实际投资金额对比
self.assertEqual(float(excepted_sql_result[0]),float(acct_loan_amount))
#用户投资后的余额对比
self.assertEqual(float(excepted_sql_result[0]),float(acct_inv_amount))
#用户投资后标的状态对比
self.assertEqual(float(excepted_sql_result[1]),float(loan_status))
result_data['sql_result']='PASS'
log.debug('数据库检查结果为:{0}'.format(result_data['sql_result']))
except AssertionError as a:
log.debug('数据库比对报错:{0}'.format(a))
result_data['sql_result']='FAIL'
result_data['result']='FAIL'
t.write_excel(dict_item['id'],result_data)
raise a
else:
#查询数据库,是否存在新增的数据,读出来是个元组类型的
acctually_sql=db.db_operation(eval(check_sql)['my_sql'])[0]
log.debug('查询数据库结果为:{0}'.format(acctually_sql))
#数据库期望结果
excepted_sql_result=eval(check_sql)['result']
try:
self.assertEqual(excepted_sql_result,acctually_sql)
result_data['sql_result']='PASS'
log.debug('数据库检查结果为:{0}'.format(result_data['sql_result']))
except AssertionError as a:#这里不抛出异常,以便程序继续执行
log.debug('数据库比对报错:{0}'.format(a))
result_data['sql_result']='FAIL'
result_data['result']='FAIL'
#result_data['sql_result']='FAIL'
#这里直接将结果写入
t.write_excel(dict_item['id'],result_data)
raise a
else:
result_data['sql_result']='NoNeedCheck'
#进行测试结果比较断言
try :
self.assertEqual(eval(excepted),eval(result_data['actually_code']))
log.info('期望结果为:{0}'.format(excepted))
result_data['result']='PASS'
log.debug('断言检查结果为{0}'.format(result_data['result']))
except Exception as e:
result_data['result']='FAIL'
log.info('期望结果为:{0}'.format(excepted))
log.debug('返回报错!{0}'.format(e))
raise e
#测试结果回写
finally:#无论结果怎样都要将测试结果写回到excel中
log.info('******开始写入数据******')
t.write_excel(dict_item['id'],result_data)
log.info('******结束写入数据******')
def tearDown(self):
log.info('--------结束测试--------')
'''
这个就是继承了unittest框架的TestCase类来写的测试用例,然后还用到了ddt装饰器,因为之前我使用读写excel的类,将测试数据都是以列表嵌套字典的方式读出来的,然后ddt的作用就是可以给一个用例传入不同的参数,然后每个运行一遍,这样的话就相当于运行了多个测试用例,但实际上我只写了一个test_case;这样就实现了单例模式,这也算是用excel管理测试用例的好处吧。
利用正则表达式进行参数替换的类
image.png
因为在测试的时候涉及到对测试数据的参数化配置,这就会涉及到参数的替换,所以我单独写了一个类来进行参数替换,使用正则表达式的方法,调用起来比较的方便,比用replace的方法要方便一点。
最后就是main执行文件的类:
'''
#用于加载并执行测试用例并生成测试报告
import unittest
import time
import HTMLTestRunnerNew
from common.test_case import Test_Case
from common.sendemail import Sendemail
from common.config import Config
from conf.projectpath import *
#添加测试集
suite=unittest.TestSuite()
#通过指定的测试类,加载测试用例
loader=unittest.TestLoader()
loader.loadTestsFromTestCase(Test_Case)
#将用例添加到测试集
suite.addTest(loader.loadTestsFromTestCase(Test_Case))
#将测试结果写入到文件中
now=time.strftime('%y-%m-%d_%H_%M_%S')
#命名一个html文件,为了避免名称不重复所以加上时间戳
filepath=test_report+'\API_test'+now+'.html'
#利用html模板导出测试报告
with open(filepath,'wb') as file:
runner=HTMLTestRunnerNew.HTMLTestRunner(stream=file,verbosity=2,title='api_test',description=None,tester='wang')
#执行测试用例
runner.run(suite)
#将测试报告通过邮件发送出去
email_to=Config().configer(email_path,'EMAIL','email_to')
sendmail=Sendemail()
#sendmail.send_email(email_to,filepath)
'''
这里我先调用TestLoader来集成测试用例,然后用TestSuite来集成测试用例,然后在将测试报告用html模板导出,这样的话运行完成后就可以在web页面打开测试报告进行查看测试结果,最后还添加了发送邮件的方法,可以将测试报告发送到个人邮箱。
我目前能做的就这些了,总的来说还是比较简易的,还有一些东西需要完善,比如部署到jenkins集成进行定时调度;代码层面也有很多地方需要优化,希望各位大神提出来;
第一次写,排版有点乱,不太会搞,莫怪莫怪!
网友评论