接口自动化框架主要是通过读取EXCEL文件中的参数及预期结果来判断接口的正确性,代码中有部分内容参考了其他大神写的。
一、结构目录总览

二、列出各个脚本的代码
1. test_case文件夹下面的start_login.py, 这个文件主要是编写测试脚本。
from tool.readExcel import readExcel
from tool.common import PublicMethod
import unittest
import const
from readConfig import ReadConfig
from tool.Logger import Log
read_config = ReadConfig()
log = Log()
class testLoginApi(unittest.TestCase):
def setUp(self):
self.file = read_config.get_test_file('test_file')
self.URL_prefix = read_config.get_http("scm_url")
def test_login(self):
'''测试登录接口'''
# 获取excel内容
test_File = const.TESTFILE + '/' + self.file
excel = readExcel(test_File)
name = excel.getName
data = excel.getData
msg = excel.getMsg
url = excel.getUrl
method = excel.getMethod
id = excel.getId
status_code = excel.getStatusCode
code = excel.getCode
row = excel.getRows
for i in range(0, row - 1):
log.info("开始执行【{}】脚本".format(name[i]))
api = PublicMethod(method[i], self.URL_prefix + url[i], data[i])
# 存储鉴权值到配置文件
api.save_authorization()
res = api.methodAPI()
apistatus = api.getStatusCode(res)
apijson = api.getJson(res)
'''
1. 验证返回的状态是否正确
2. 验证Excel中msg的所有值是否都正确
3. 这边判断msg是否为空,但是不做错误处理,因为有可能没有返回值的,所以就不判断
'''
# 判断返回的状态码是否正确
self.assertEqual(apistatus, int(status_code[i]), msg='状态匹配不成功')
log.info('{}.{}:状态测试成功'.format(id[i], name[i]))
# 验证Excel中msg的所有值是否都正确
if len(msg)-1 != 0:
# excel取过来的msg转为字典格式
dict_msg = eval(msg[i])
# 循环遍历找到对应的key
for key, value in dict_msg.items():
self.assertTrue(key in apijson.keys() or key in apijson['data'].keys(), msg="没有找到要对比的key")
response_msg = apijson[key] if key in apijson.keys() else apijson['data'][key]
self.assertEqual(dict_msg[key], response_msg, msg='Excel提取的参数跟返回json字段值不相同')
log.info('{}.{}:返回的字段[{}]验证成功'.format(id[i], name[i], key))
else:
log.info('没有需要验证的参数')
log.info("*"*30)
log.info("结束【{}】脚本测试".format(name[i]))
log.info("-"*30)
2. tool文件夹下面的common.py文件,这个主要是编写一些公共方法
import requests
import json
from readConfig import ReadConfig
from tool.Logger import Log
log = Log()
readconfig = ReadConfig()
class PublicMethod(object):
def __init__(self, method, url, data):
self.method = method
self.url = url
self.data = data
@property
def headers(self):
authorization = readconfig.get_headers('authorization')
headers = {"Content-Type": 'application/json'}
# 公司这边是通过鉴权参数来判断用户是否登录的
headers['Authorization'] = authorization
return headers
def methodAPI(self):
# 根据不同的访问方式来访问接口
try:
if self.method == 'post':
res = requests.post(self.url, data=json.dumps(eval(self.data)), headers=self.headers)
log.info("调用【 post 】方式")
elif self.method == 'get':
res = requests.get(self.url, params=self.data)
log.info("调用【 get 】方式")
return res
except Exception as e:
log.error('接口调用method不存在')
log.error(str(e))
def getStatusCode(self, res):
# 获取返回信息status_code
try:
status_code = res.status_code
log.info("获取返回信息status_code【" + str(status_code) + "】")
return status_code
except Exception as e:
log.error('获取status_code失败')
log.error(str(e))
def getJson(self, res):
try:
json_data = res.json()
log.info("成功获取返回信息json_data")
return json_data
except Exception as e:
log.error('获取json_data失败')
log.error(str(e))
# 获取鉴权的值并存放到配置文件中
def save_authorization(self):
try:
data = {"username": "xxx", "password": "xxx"}
res = requests.post("https://baidu.com", data=data)
authorization = res.json()['data']['jwt']
readconfig.set_headers('authorization', authorization)
log.info("成功存储参数到配置文件")
except Exception as e:
log.error('获取authorization失败')
log.error(str(e))
3. tool文件夹下面的db_model.py文件,编写ORM的类文件,目前这个没啥用,只是放着以后用
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from readConfig import ReadConfig
from sqlalchemy import Column,String,Integer
"""获取DB配置信息"""
host = ReadConfig.get_db('host')
username = ReadConfig.get_db('username')
password = ReadConfig.get_db('password')
database = ReadConfig.get_db('database')
# engine = create_engine('mysql+pymysql://'+ username +
# ":" + password + "@" + host +":3306/" + database)
engine = create_engine('mysql+pymysql://root:123456@localhost/xxxx')
Base = declarative_base()
class EMPLOYEE(Base):
__tablename__ = 'employee'
id = Column(Integer, primary_key=True)
first_name = Column(String(64))
last_name = Column(String(64))
age = Column(Integer)
sex = Column(String(32))
income = Column(String(64))
Base.metadata.create_all(engine)
4. tool文件夹下面的db_option.py文件,该文件主要是用来封装db操作的方法,目前暂时没用,没有用到。
from sqlalchemy.orm import sessionmaker
from . import db_model
from .db_model import EMPLOYEE
class Option_DB(object):
DBSession = sessionmaker(bind=db_model.engine)
session = DBSession()
def insert(self, first_name, last_name, age, sex, income):
new_employee = EMPLOYEE(first_name=first_name, last_name=last_name, age=age, sex=sex, income=income)
self.session.add(new_employee)
self.session.commit()
def query(self, param):
data = self.session.query(EMPLOYEE).filter(EMPLOYEE.first_name == param).first()
return data
if __name__ == '__main__':
db = Option_DB()
# db.insert('wang', 'sads', 18, 'F', '14000')
new_empolyee = db.query('zhang')
print(new_empolyee.first_name)
5. tool文件夹下面的Logger.py文件,配置log
import logging, time, os, const
# 这个是日志保存本地的路径
log_path = const.LOGPATH
class Log(object):
def __init__(self):
# 文件的命名
self.logname = os.path.join(log_path, '%s.log'%time.strftime('%Y_%m_%d'))
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
# 日志输出格式
self.formatter = logging.Formatter('[%(asctime)s] - %(filename)s - %(levelname)s: %(message)s')
def __console(self, level, message):
# 创建一个FileHandler,用于写到本地
fh = logging.FileHandler(self.logname, 'a') # 追加模式
fh.setLevel(logging.DEBUG)
fh.setFormatter(self.formatter)
self.logger.addHandler(fh)
# 创建一个StreamHandler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(self.formatter)
self.logger.addHandler(ch)
if level == 'info':
self.logger.info(message)
elif level == 'debug':
self.logger.debug(message)
elif level == 'warning':
self.logger.warning(message)
elif level == 'error':
self.logger.error(message)
# 这两行代码是为了避免日志输出重复问题
self.logger.removeHandler(ch)
self.logger.removeHandler(fh)
# 关闭打开的文件
fh.close()
ch.close()
def debug(self, message):
self.__console('debug', message)
def info(self, message):
self.__console('info', message)
def warning(self, message):
self.__console('warning', message)
def error(self, message):
self.__console('error', message)
6. tool文件夹下面的readExcel.py文件,读取Excel文件信息
import xlrd
class readExcel(object):
def __init__(self, path):
self.path = path
@property
def getSheet(self):
# 获取索引
xl = xlrd.open_workbook(self.path)
sheet = xl.sheet_by_index(0)
# print( xl.sheet_names() ) 打印所有sheet名字
# print (sheet.cell_value( 2, 3 )) 打印第3行第4列
return sheet
@property
def getRows(self):
# 获取行数
row = self.getSheet.nrows
return row
@property
def getCol(self):
# 获取列数
col = self.getSheet.ncols
return col
# 以下是分别获取每一列的数值
@property
def getId(self):
TestId = []
for i in range( 1, self.getRows ):
TestId.append( self.getSheet.cell_value( i, 0 ) )
# print(TestName)
return TestId
@property
def getName(self):
TestName = []
for i in range(1, self.getRows):
TestName.append(self.getSheet.cell_value(i, 1))
# print(TestName)
return TestName
@property
def getData(self):
TestData = []
for i in range(1, self.getRows):
TestData.append(self.getSheet.cell_value(i, 2))
return TestData
@property
def getUrl(self):
TestUrl = []
for i in range(1, self.getRows):
TestUrl.append(self.getSheet.cell_value(i, 3))
return TestUrl
@property
def getMethod(self):
TestMethod = []
for i in range(1, self.getRows):
TestMethod.append(self.getSheet.cell_value(i, 4))
return TestMethod
@property
def getStatusCode(self):
TestUid = []
for i in range(1, self.getRows):
TestUid.append(self.getSheet.cell_value(i, 5))
return TestUid
@property
def getCode(self):
TestCode = []
for i in range(1, self.getRows):
TestCode.append(self.getSheet.cell_value(i, 6))
return TestCode
@property
def getMsg(self):
test_msg = []
for i in range(1, self.getRows):
test_msg.append(self.getSheet.cell_value(i, 7))
return test_msg
6. config.ini文件,全局配置文件
[EMAIL]
sendfrom = xx.com
sendto = xx@qq.com
sendserver = smtp.126.com
port = 25
email_username = xxx
email_password = xxxx
[HTTP]
scheme = http
bom_url = https://baidu.com
scm_url = https://baidu.com
port = 8080
timeout = 10.0
scm_username = xxx
scm_password = xxx
bom_username = xxx
bom_password = xxx
scm_login_url = https://baidu.com
bom_login_url = https://baidu.com
[HEADERS]
authorization =
[DATABASE]
host = 127.0.0.1
username = root
password = 123456
database = xxx
[TEST]
test_file = scm.xlsx
7. const.py, 全局的路径配置
#coding:utf-8
import sys, os
class _const:
class ConstError(TypeError): pass
class ConstCaseError(ConstError): pass
def __setattr__(self, name, value):
if name in self.__dict__:
raise self.ConstError("can't change const %s" % name)
if not name.isupper():
raise self.ConstCaseError('const name "%s" is not all uppercase' % name)
self.__dict__[name] = value
sys.modules[__name__] = _const()
_const.CONFPATH = os.getcwd() + "/config.ini"
_const.CASEFILEPATH = os.getcwd() + "/test_case"
_const.REPORTPATH = os.getcwd() + "/report/"
_const.LOGPATH = os.getcwd() + "/Log/"
_const.TESTFILE = os.getcwd() + "/test_File"
8 readConfig.py, 读取配置文件
import codecs
import configparser
import const
configPath = const.CONFPATH
class ReadConfig(object):
def __init__(self):
fd = open(configPath)
data = fd.read()
# remove BOM
if data[:3] == codecs.BOM_UTF8:
data = data[3:]
file = codecs.open(configPath, "w")
file.write(data)
file.close()
fd.close()
self.cf = configparser.ConfigParser()
self.cf.read(configPath)
def get_email(self, name):
value = self.cf.get("EMAIL", name)
return value
def get_http(self, name):
value = self.cf.get("HTTP", name)
return value
def get_headers(self, name):
value = self.cf.get("HEADERS", name)
return value
def set_headers(self, name, value):
self.cf.set("HEADERS", name, value)
with open(configPath, 'w+') as f:
self.cf.write(f)
def get_url(self, name):
value = self.cf.get("URL", name)
return value
def get_db(self, name):
value = self.cf.get("DATABASE", name)
return value
def get_test_file(self, name):
value = self.cf.get("TEST", name)
return value
9 执行所有的脚本
import unittest,time,os
import HTMLTestRunner
import HTMLTestReportCN
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import const
from readConfig import ReadConfig
"""
1. 下面这段代码,要执行的是:从allcase_list文件中获取需要的执行的用例
2. 循环遍历list然添加到testunit
"""
# alltestnames = allcase_list.caselist()
#
# testunit = unittest.TestSuite()
#
# for testname in alltestnames:
# testunit.addTest(unittest.makeSuite(testname))
"""
1. 使用discover实现,从某个目录中获取需要执行的用例文件
2. start_dir:指定要遍历的文件目录
3. pattern:指定要遍历的文件,通过正则来过滤
4. top_level_dir:默认设置为None
"""
class OptionTest(object):
def creatsuitel(self):
testunit = unittest.TestSuite()
discover = unittest.defaultTestLoader.discover(
start_dir=const.CASEFILEPATH,
pattern='start_*.py',
top_level_dir=None
)
for test_suit in discover:
for test_case in test_suit:
testunit.addTest(test_case)
print(testunit)
return testunit
""" 执行测试用例"""
def runnerTestSuite(self):
#执行测试套件
ISOTIMEFORMAT = '%Y-%m-%d-%H-%M-%S'
nowTime = time.strftime(ISOTIMEFORMAT, time.localtime())
file = const.REPORTPATH + nowTime + "result.html"
with open(file, 'wb') as filename:
runner = HTMLTestReportCN.HTMLTestRunner(
stream=filename,
title=u'测试报告',
description=u"测试报告",
verbosity=2
)
runner.run(self.creatsuitel())
"""发送报告"""
def sendEmail(self, file_new):
#读取最新的报告文件
with open(file_new, 'rb') as filename:
mail_body = filename.read()
# print('--------------------------')
# print(mail_body)
msg = MIMEMultipart()
#定义文件正文
msg.attach(MIMEText(mail_body, _subtype='html', _charset='utf-8'))
#定义标题
msg['Subject'] = u"测试报告"
str_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
msg['date'] = str_time
# 添加附件
att1 = MIMEText(open(file_new, 'rb').read(), 'base64', 'utf-8')
att1["Content-Type"] = 'application/octet-stream'
filename = file_new.split('/')
att1["Content-Disposition"] = 'attachment; filename=filename[-1]'
msg.attach(att1)
#发送步骤
smtp = smtplib.SMTP()
smtp.connect(self.sendServer, self.port)
smtp.login(self.email_username, self.email_password)
try:
smtp.sendmail(self.sendFrom, self.sendTo, msg)
print('邮箱发送成功!')
except:
print('发送不成功,退信!!')
smtp.quit()
"""获取最后一份报告文件并发送邮件"""
def sendReport(self):
"""获取最新的文件"""
result_dir = "./report/"
lists = os.listdir(result_dir)
lists.sort(key=lambda fn: os.path.getmtime(result_dir + fn) if not os.path.isdir(result_dir + fn) else 0)
print(u'最新生成的报告' + lists[-1])
#找到最新生成的文件
file_new = os.path.join(result_dir, lists[-1])
self.sendEmail(file_new)
if __name__ == '__main__':
test = OptionTest()
test.runnerTestSuite()
# test.sendReport()
三、接口Excel文档截图,如图所示
- data:接口请求的参数均使用json串
- id:只是为了记录接口名字
- status_code:验证返回的状态码
-
msg : 存放的是需要验证的字段
image.png
四、执行测试用例后,展示的测试报告
1. 执行通过的报告

2. 执行未通过的报告

五、执行测试用例后,log目录下同时也会生成对应的log

网友评论