# project > core > restclient.py
import requests
import json
from utils.read_test import base_data
from utils.log_util import logger
api_root_url = base_data.read_ini()['host']['api_sit_url']
class RestClient:
def __init__(self):
self.api_root_url = api_root_url
self.session = requests.Session()
def get(self, url, **kwargs):
return self.request(url, "GET", **kwargs)
def post(self, url, **kwargs):
return self.request(url, "POST", **kwargs)
def put(self, url, **kwargs):
return self.request(url, "PUT", **kwargs)
def delete(self, url, **kwargs):
return self.request(url, "DELETE", **kwargs)
def request(self, url, method, **kwargs):
self.request_log(url, method, **kwargs)
if method == "GET":
return self.session.get(self.api_root_url + url, **kwargs)
if method == "POST":
return self.session.post(self.api_root_url + url, **kwargs)
if method == "PUT":
return self.session.put(self.api_root_url + url, **kwargs)
if method == "DELETE":
return self.session.delete(self.api_root_url + url, **kwargs)
def request_log(self, url, method, **kwargs):
data = dict(**kwargs).get("data")
json_data = dict(**kwargs).get("json")
params = dict(**kwargs).get("params")
headers = dict(**kwargs).get("headers")
logger.info("接口请求的地址>>>{}".format(self.api_root_url + url))
logger.info("接口请求的方法>>>{}".format(method))
if data is not None:
logger.info("接口请求的data参数>>>\n{}".format(json.dumps(data, ensure_ascii=False, indent=2)))
if json_data is not None:
logger.info("接口请求的json参数>>>\n{}".format(json.dumps(json_data, ensure_ascii=False, indent=2)))
if params is not None:
logger.info("接口请求的params参数>>>\n{}".format(json.dumps(params, ensure_ascii=False, indent=2)))
if headers is not None:
logger.info("接口请求的headers参数>>>\n{}".format(json.dumps(headers, ensure_ascii=False, indent=2)))
project > config > setting.ini
[host]
api_sit_url = https://jsonplaceholder.typicode.com
# project > test_cases > test_post.py
import pytest
from api.api import try_json
from utils.read_test import base_data
from utils.log_util import logger
def test_post():
json_data = base_data.read_data()['json_data']
result = try_json(json_data)
assert result['id'] == 101
if __name__ == '__main__':
pytest.main()
# project > api > api.py
from core.api_util import api_util
from utils.response_util import process_response
def mobile_query(params):
response = api_util.get_mobile_belong(params=params)
result = process_response(response)
return result
def try_json(json_data):
"""
This method is to test json parameters
:param json_data:
:return:
"""
response = api_util.post_data(json=json_data)
return response.json()
# project > core > api_util.py
from core.rest_client import RestClient
class Api(RestClient):
def __init__(self):
super().__init__()
def get_mobile_belong(self, **kwargs):
return self.get('/shouji/query', **kwargs)
def post_data(self, **kwargs):
return self.post('/posts', **kwargs)
api_util = Api()
# project > core > rest_client.py
import requests
import json
from utils.read_test import base_data
from utils.log_util import logger
api_root_url = base_data.read_ini()['host']['api_sit_url']
class RestClient:
def __init__(self):
self.api_root_url = api_root_url
self.session = requests.Session()
def get(self, url, **kwargs):
return self.request(url, "GET", **kwargs)
def post(self, url, **kwargs):
return self.request(url, "POST", **kwargs)
def put(self, url, **kwargs):
return self.request(url, "PUT", **kwargs)
def delete(self, url, **kwargs):
return self.request(url, "DELETE", **kwargs)
def request(self, url, method, **kwargs):
self.request_log(url, method, **kwargs)
if method == "GET":
return self.session.get(self.api_root_url + url, **kwargs)
if method == "POST":
return self.session.post(self.api_root_url + url, **kwargs)
if method == "PUT":
return self.session.put(self.api_root_url + url, **kwargs)
if method == "DELETE":
return self.session.delete(self.api_root_url + url, **kwargs)
def request_log(self, url, method, **kwargs):
data = dict(**kwargs).get("data")
json_data = dict(**kwargs).get("json")
params = dict(**kwargs).get("params")
headers = dict(**kwargs).get("headers")
logger.info("接口请求的地址>>>{}".format(self.api_root_url + url))
logger.info("接口请求的方法>>>{}".format(method))
if data is not None:
logger.info("接口请求的data参数>>>\n{}".format(json.dumps(data, ensure_ascii=False, indent=2)))
if json_data is not None:
logger.info("接口请求的json参数>>>\n{}".format(json.dumps(json_data, ensure_ascii=False, indent=2)))
if params is not None:
logger.info("接口请求的params参数>>>\n{}".format(json.dumps(params, ensure_ascii=False, indent=2)))
if headers is not None:
logger.info("接口请求的headers参数>>>\n{}".format(json.dumps(headers, ensure_ascii=False, indent=2)))
# project > util > read_test.py
import yaml
import configparser
import os
current_path = os.path.realpath(__file__)
parent_path = os.path.dirname(current_path)
data_path = os.path.join(os.path.dirname(parent_path), "data", "data.yaml")
ini_path = os.path.join(os.path.dirname(parent_path), "config", "setting.ini")
class FileRead:
def __init__(self):
self.data_path = data_path
self.ini_path = ini_path
def read_data(self):
f = open(self.data_path, encoding="utf8")
data = yaml.safe_load(f)
return data
def read_ini(self):
config = configparser.ConfigParser()
config.read(self.ini_path, encoding='utf8')
return config
base_data = FileRead()
# project > utils > log_util.py
import logging
import os.path
import time
root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
log_path = os.path.join(root_path, "log")
class Logger:
def __init__(self):
# 定义日志位置和文件名
self.logname = os.path.join(log_path, "{}.log".format(time.strftime("%Y-%m-%d")))
# 定义一个日志容器
self.logger = logging.getLogger("log")
# 设置日志打印的级别
self.logger.setLevel(logging.DEBUG)
# 创建日志输入的格式
self.formater = logging.Formatter(
'[%(asctime)s][%(filename)s %(lineno)d][%(levelname)s]: %(message)s')
# 创建日志处理器,用来存放日志文件
self.filelogger = logging.FileHandler(self.logname, mode='a', encoding="UTF-8")
# 创建日志处理器,在控制台打印
self.console = logging.StreamHandler()
# 设置控制台打印日志界别
self.console.setLevel(logging.DEBUG)
# 文件存放日志级别
self.filelogger.setLevel(logging.DEBUG)
# 文件存放日志格式
self.filelogger.setFormatter(self.formater)
# 控制台打印日志格式
self.console.setFormatter(self.formater)
# 将日志输出渠道添加到日志收集器中
self.logger.addHandler(self.filelogger)
self.logger.addHandler(self.console)
def test(self):
self.logger.info("hahahaha")
logger = Logger().logger
if __name__ == '__main__':
logger.debug("我打印debug日志")
logger.info("我打印info日志")
logger.warning("我打印warning日志")
logger.error("我打印error日志")
# Logger().test()
# project > utils > response_util.py
import json
from core.ResultBase import ResultResponse
from utils.log_util import logger
def process_response(response):
if response.status_code == 200 or response.status_code == 201:
ResultResponse.success = True
ResultResponse.body = response.json()
logger.info("接口的返回内容>>>:" + json.dumps(response.json(), ensure_ascii=False))
else:
ResultResponse.success = False
logger.info("接口状态码不是2开头,请检查")
logger.info("接口的返回内容>>>:" + json.dumps(response.json(), ensure_ascii=False))
return ResultResponse
# project > core > ResultBase.py
# 用来组装response
class ResultResponse:
pass
project > test_cases > conftest.py
import pytest
from utils.log_util import logger
@pytest.fixture(scope="function", autouse=True)
def func():
logger.info("test_case starting...")
yield
logger.info("test_case completed...")
网友评论