美文网首页
Python接口自动化框架(python + requests

Python接口自动化框架(python + requests

作者: 324068f66fe6 | 来源:发表于2018-10-30 15:40 被阅读127次

接口自动化框架主要是通过读取EXCEL文件中的参数及预期结果来判断接口的正确性,代码中有部分内容参考了其他大神写的。

一、结构目录总览

image.png

二、列出各个脚本的代码

1. test_case文件夹下面的start_login.py, 这个文件主要是编写测试脚本。
from tool.readExcel import readExcel
from tool.common import PublicMethod
import unittest
import const
from readConfig import ReadConfig
from tool.Logger import Log

read_config = ReadConfig()
log = Log()


class testLoginApi(unittest.TestCase):

    def setUp(self):
        self.file = read_config.get_test_file('test_file')
        self.URL_prefix = read_config.get_http("scm_url") 

    def test_login(self):

        '''测试登录接口'''

        # 获取excel内容
        test_File = const.TESTFILE + '/' + self.file
        excel = readExcel(test_File)
        name = excel.getName
        data = excel.getData
        msg = excel.getMsg

        url = excel.getUrl
        method = excel.getMethod
        id = excel.getId
        status_code = excel.getStatusCode
        code = excel.getCode
        row = excel.getRows
        for i in range(0, row - 1):
            log.info("开始执行【{}】脚本".format(name[i]))
            api = PublicMethod(method[i], self.URL_prefix + url[i], data[i])
            # 存储鉴权值到配置文件
            api.save_authorization()
            res = api.methodAPI()
            apistatus = api.getStatusCode(res)
            apijson = api.getJson(res)

            '''
             1. 验证返回的状态是否正确
             2. 验证Excel中msg的所有值是否都正确
             3. 这边判断msg是否为空,但是不做错误处理,因为有可能没有返回值的,所以就不判断
            '''
            # 判断返回的状态码是否正确
            self.assertEqual(apistatus, int(status_code[i]), msg='状态匹配不成功')
            log.info('{}.{}:状态测试成功'.format(id[i], name[i]))

            # 验证Excel中msg的所有值是否都正确
            if len(msg)-1 != 0:
                # excel取过来的msg转为字典格式
                dict_msg = eval(msg[i])
                # 循环遍历找到对应的key
                for key, value in dict_msg.items():
                    self.assertTrue(key in apijson.keys() or key in apijson['data'].keys(), msg="没有找到要对比的key")
                    response_msg = apijson[key] if key in apijson.keys() else apijson['data'][key]
                    self.assertEqual(dict_msg[key], response_msg, msg='Excel提取的参数跟返回json字段值不相同')
                    log.info('{}.{}:返回的字段[{}]验证成功'.format(id[i], name[i], key))
            else:
                log.info('没有需要验证的参数')
            log.info("*"*30)

        log.info("结束【{}】脚本测试".format(name[i]))
        log.info("-"*30)
2. tool文件夹下面的common.py文件,这个主要是编写一些公共方法
import requests
import json
from readConfig import ReadConfig
from tool.Logger import Log

log = Log()
readconfig = ReadConfig()

class PublicMethod(object):
    def __init__(self, method, url, data):
        self.method = method
        self.url = url
        self.data = data

    @property
    def headers(self):
        authorization = readconfig.get_headers('authorization')
        headers = {"Content-Type": 'application/json'}
        # 公司这边是通过鉴权参数来判断用户是否登录的
        headers['Authorization'] = authorization
        return headers

    def methodAPI(self):
        # 根据不同的访问方式来访问接口
        try:
            if self.method == 'post':
                res = requests.post(self.url, data=json.dumps(eval(self.data)), headers=self.headers)
                log.info("调用【 post 】方式")
            elif self.method == 'get':
                res = requests.get(self.url, params=self.data)
                log.info("调用【 get 】方式")
            return res
        except Exception as e:
            log.error('接口调用method不存在')
            log.error(str(e))

    def getStatusCode(self, res):
        # 获取返回信息status_code
        try:
            status_code = res.status_code
            log.info("获取返回信息status_code【" + str(status_code) + "】")
            return status_code
        except Exception as e:
            log.error('获取status_code失败')
            log.error(str(e))

    def getJson(self, res):
        try:
            json_data = res.json()
            log.info("成功获取返回信息json_data")
            return json_data
        except Exception as e:
            log.error('获取json_data失败')
            log.error(str(e))
    # 获取鉴权的值并存放到配置文件中
    def save_authorization(self):
        try:
            data = {"username": "xxx", "password": "xxx"}
            res = requests.post("https://baidu.com", data=data)
            authorization = res.json()['data']['jwt']
            readconfig.set_headers('authorization', authorization)
            log.info("成功存储参数到配置文件")
        except Exception as e:
            log.error('获取authorization失败')
            log.error(str(e))
3. tool文件夹下面的db_model.py文件,编写ORM的类文件,目前这个没啥用,只是放着以后用
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from readConfig import ReadConfig
from sqlalchemy import Column,String,Integer

"""获取DB配置信息"""
host = ReadConfig.get_db('host')
username = ReadConfig.get_db('username')
password = ReadConfig.get_db('password')
database = ReadConfig.get_db('database')

# engine = create_engine('mysql+pymysql://'+ username +
#                        ":" + password + "@" + host +":3306/" + database)

engine = create_engine('mysql+pymysql://root:123456@localhost/xxxx')

Base = declarative_base()

class EMPLOYEE(Base):
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    first_name = Column(String(64))
    last_name = Column(String(64))
    age = Column(Integer)
    sex = Column(String(32))
    income = Column(String(64))

Base.metadata.create_all(engine)

4. tool文件夹下面的db_option.py文件,该文件主要是用来封装db操作的方法,目前暂时没用,没有用到。

from sqlalchemy.orm import sessionmaker
from . import db_model
from .db_model import EMPLOYEE


class Option_DB(object):
    DBSession = sessionmaker(bind=db_model.engine)
    session = DBSession()

    def insert(self, first_name, last_name, age, sex, income):
        new_employee = EMPLOYEE(first_name=first_name, last_name=last_name, age=age, sex=sex, income=income)
        self.session.add(new_employee)

        self.session.commit()

    def query(self, param):
        data = self.session.query(EMPLOYEE).filter(EMPLOYEE.first_name == param).first()
        return data

if __name__ == '__main__':
    db = Option_DB()
    # db.insert('wang', 'sads', 18, 'F', '14000')
    new_empolyee = db.query('zhang')
    print(new_empolyee.first_name)

5. tool文件夹下面的Logger.py文件,配置log

import logging, time, os, const

# 这个是日志保存本地的路径
log_path = const.LOGPATH


class Log(object):
    def __init__(self):
        # 文件的命名
        self.logname = os.path.join(log_path, '%s.log'%time.strftime('%Y_%m_%d'))
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.DEBUG)
        # 日志输出格式
        self.formatter = logging.Formatter('[%(asctime)s] - %(filename)s - %(levelname)s: %(message)s')

    def __console(self, level, message):
        # 创建一个FileHandler,用于写到本地
        fh = logging.FileHandler(self.logname, 'a')  # 追加模式
        fh.setLevel(logging.DEBUG)
        fh.setFormatter(self.formatter)
        self.logger.addHandler(fh)

        # 创建一个StreamHandler,用于输出到控制台
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        ch.setFormatter(self.formatter)
        self.logger.addHandler(ch)

        if level == 'info':
            self.logger.info(message)
        elif level == 'debug':
            self.logger.debug(message)
        elif level == 'warning':
            self.logger.warning(message)
        elif level == 'error':
            self.logger.error(message)
        # 这两行代码是为了避免日志输出重复问题
        self.logger.removeHandler(ch)
        self.logger.removeHandler(fh)
        # 关闭打开的文件
        fh.close()
        ch.close()

    def debug(self, message):
        self.__console('debug', message)

    def info(self, message):
        self.__console('info', message)

    def warning(self, message):
        self.__console('warning', message)

    def error(self, message):
        self.__console('error', message)

6. tool文件夹下面的readExcel.py文件,读取Excel文件信息

import xlrd


class readExcel(object):
    def __init__(self, path):
        self.path = path

    @property
    def getSheet(self):
        # 获取索引
        xl = xlrd.open_workbook(self.path)
        sheet = xl.sheet_by_index(0)
        # print( xl.sheet_names() )   打印所有sheet名字
        # print (sheet.cell_value( 2, 3 ))   打印第3行第4列
        return sheet

    @property
    def getRows(self):
        # 获取行数
        row = self.getSheet.nrows
        return row

    @property
    def getCol(self):
        # 获取列数
        col = self.getSheet.ncols
        return col

    # 以下是分别获取每一列的数值

    @property
    def getId(self):
        TestId = []
        for i in range( 1, self.getRows ):
            TestId.append( self.getSheet.cell_value( i, 0 ) )
        # print(TestName)
        return TestId

    @property
    def getName(self):
        TestName = []
        for i in range(1, self.getRows):
            TestName.append(self.getSheet.cell_value(i, 1))
        # print(TestName)
        return TestName

    @property
    def getData(self):
        TestData = []
        for i in range(1, self.getRows):
            TestData.append(self.getSheet.cell_value(i, 2))
        return TestData

    @property
    def getUrl(self):
        TestUrl = []
        for i in range(1, self.getRows):
            TestUrl.append(self.getSheet.cell_value(i, 3))
        return TestUrl

    @property
    def getMethod(self):
        TestMethod = []
        for i in range(1, self.getRows):
            TestMethod.append(self.getSheet.cell_value(i, 4))
        return TestMethod

    @property
    def getStatusCode(self):
        TestUid = []
        for i in range(1, self.getRows):
            TestUid.append(self.getSheet.cell_value(i, 5))
        return TestUid

    @property
    def getCode(self):
        TestCode = []
        for i in range(1, self.getRows):
            TestCode.append(self.getSheet.cell_value(i, 6))
        return TestCode

    @property
    def getMsg(self):
        test_msg = []
        for i in range(1, self.getRows):
            test_msg.append(self.getSheet.cell_value(i, 7))
        return test_msg

6. config.ini文件,全局配置文件

[EMAIL]
sendfrom = xx.com
sendto = xx@qq.com
sendserver = smtp.126.com
port = 25
email_username = xxx
email_password = xxxx

[HTTP]
scheme = http
bom_url = https://baidu.com
scm_url = https://baidu.com
port = 8080
timeout = 10.0
scm_username = xxx
scm_password = xxx
bom_username = xxx
bom_password = xxx
scm_login_url = https://baidu.com
bom_login_url = https://baidu.com

[HEADERS]
authorization = 

[DATABASE]
host = 127.0.0.1
username = root
password = 123456
database = xxx

[TEST]
test_file = scm.xlsx

7. const.py, 全局的路径配置

#coding:utf-8
import sys, os


class _const:

  class ConstError(TypeError): pass

  class ConstCaseError(ConstError): pass

  def __setattr__(self, name, value):
      if name in self.__dict__:
          raise self.ConstError("can't change const %s" % name)
      if not name.isupper():
          raise self.ConstCaseError('const name "%s" is not all uppercase' % name)
      self.__dict__[name] = value


sys.modules[__name__] = _const()

_const.CONFPATH = os.getcwd() + "/config.ini"
_const.CASEFILEPATH = os.getcwd() + "/test_case"
_const.REPORTPATH = os.getcwd() + "/report/"
_const.LOGPATH = os.getcwd() + "/Log/"
_const.TESTFILE = os.getcwd() + "/test_File"

8 readConfig.py, 读取配置文件

import codecs
import configparser
import const

configPath = const.CONFPATH

class ReadConfig(object):
    def __init__(self):
        fd = open(configPath)
        data = fd.read()

        #  remove BOM
        if data[:3] == codecs.BOM_UTF8:
            data = data[3:]
            file = codecs.open(configPath, "w")
            file.write(data)
            file.close()
        fd.close()

        self.cf = configparser.ConfigParser()
        self.cf.read(configPath)

    def get_email(self, name):
        value = self.cf.get("EMAIL", name)
        return value

    def get_http(self, name):
        value = self.cf.get("HTTP", name)
        return value

    def get_headers(self, name):
        value = self.cf.get("HEADERS", name)
        return value

    def set_headers(self, name, value):
        self.cf.set("HEADERS", name, value)
        with open(configPath, 'w+') as f:
            self.cf.write(f)

    def get_url(self, name):
        value = self.cf.get("URL", name)
        return value

    def get_db(self, name):
        value = self.cf.get("DATABASE", name)
        return value

    def get_test_file(self, name):
        value = self.cf.get("TEST", name)
        return value

9 执行所有的脚本

import unittest,time,os
import HTMLTestRunner
import HTMLTestReportCN
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import const
from readConfig import ReadConfig

"""
    1. 下面这段代码,要执行的是:从allcase_list文件中获取需要的执行的用例
    2. 循环遍历list然添加到testunit
"""
# alltestnames = allcase_list.caselist()
#
# testunit = unittest.TestSuite()
#
# for testname in alltestnames:
#     testunit.addTest(unittest.makeSuite(testname))

"""
    1. 使用discover实现,从某个目录中获取需要执行的用例文件
    2. start_dir:指定要遍历的文件目录
    3. pattern:指定要遍历的文件,通过正则来过滤
    4. top_level_dir:默认设置为None
"""


class OptionTest(object):

    def creatsuitel(self):
        testunit = unittest.TestSuite()
        discover = unittest.defaultTestLoader.discover(
            start_dir=const.CASEFILEPATH,
            pattern='start_*.py',
            top_level_dir=None
        )

        for test_suit in discover:
            for test_case in test_suit:
                testunit.addTest(test_case)
        print(testunit)
        return testunit


    """ 执行测试用例"""
    def runnerTestSuite(self):

        #执行测试套件

        ISOTIMEFORMAT = '%Y-%m-%d-%H-%M-%S'

        nowTime = time.strftime(ISOTIMEFORMAT, time.localtime())

        file = const.REPORTPATH + nowTime + "result.html"

        with open(file, 'wb') as filename:
            runner = HTMLTestReportCN.HTMLTestRunner(
                stream=filename,
                title=u'测试报告',
                description=u"测试报告",
                verbosity=2
            )
            runner.run(self.creatsuitel())

    """发送报告"""

    def sendEmail(self, file_new):

        #读取最新的报告文件
        with open(file_new, 'rb') as filename:
            mail_body = filename.read()
        # print('--------------------------')
        # print(mail_body)

        msg = MIMEMultipart()

        #定义文件正文
        msg.attach(MIMEText(mail_body, _subtype='html', _charset='utf-8'))
        #定义标题
        msg['Subject'] = u"测试报告"
        str_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        msg['date'] = str_time

        # 添加附件
        att1 = MIMEText(open(file_new, 'rb').read(), 'base64', 'utf-8')
        att1["Content-Type"] = 'application/octet-stream'
        filename = file_new.split('/')
        att1["Content-Disposition"] = 'attachment; filename=filename[-1]'
        msg.attach(att1)
        #发送步骤
        smtp = smtplib.SMTP()
        smtp.connect(self.sendServer, self.port)
        smtp.login(self.email_username, self.email_password)
        try:
            smtp.sendmail(self.sendFrom, self.sendTo, msg)
            print('邮箱发送成功!')
        except:
            print('发送不成功,退信!!')
        smtp.quit()


    """获取最后一份报告文件并发送邮件"""
    def sendReport(self):
        """获取最新的文件"""
        result_dir = "./report/"
        lists = os.listdir(result_dir)
        lists.sort(key=lambda fn: os.path.getmtime(result_dir + fn) if not os.path.isdir(result_dir + fn) else 0)
        print(u'最新生成的报告' + lists[-1])
        #找到最新生成的文件
        file_new = os.path.join(result_dir, lists[-1])
        self.sendEmail(file_new)


if __name__ == '__main__':
    test = OptionTest()
    test.runnerTestSuite()
    # test.sendReport()

三、接口Excel文档截图,如图所示

  1. data:接口请求的参数均使用json串
  2. id:只是为了记录接口名字
  3. status_code:验证返回的状态码
  4. msg : 存放的是需要验证的字段


    image.png

四、执行测试用例后,展示的测试报告

1. 执行通过的报告
image.png
2. 执行未通过的报告
image.png

五、执行测试用例后,log目录下同时也会生成对应的log

image.png

相关文章

网友评论

      本文标题:Python接口自动化框架(python + requests

      本文链接:https://www.haomeiwen.com/subject/fadbnftx.html