因为请求API时,需要带token这些,所以requests库每次请求的主机头,需要带上Authorization这个header,且格式为"Bearer xxxxxxxxxx.xxxxxxxxx.xxxxxxx"的形式。 于是索性把get和post, response都封装一下,快~~~
Class代码:
初始化时,就调用生成token的方法
class ReqClient:
def __init__(self, url, username, password):
self.req_url = url
self.username = username
self.password = password
self.token = self._get_token("/token")
def _get_token(self, path):
try:
res = requests.post(
self.req_url + path,
params={
'username': self.username,
'password': self.password,
},
verify=False,
)
res_dict = res.json()
if 'access_token' in res_dict:
return res_dict['access_token']
else:
raise Exception('未获取到访问token!')
except Exception as e:
raise Exception(e)
def get(self, path, **kwargs):
"""封装get方法"""
# 获取请求参数
params = kwargs.get("params", {})
headers = kwargs.get("headers", {})
headers['Authorization'] = "Bearer {}".format(self.token)
try:
result = requests.get(self.req_url + path,
params=params,
headers=headers,
verify=False)
return result
except Exception as e:
raise Exception("get请求错误: %s" % e)
def post(self, path, **kwargs):
"""封装post方法"""
# 获取请求参数
params = kwargs.get("params", {})
data = kwargs.get("data", {})
jn = kwargs.get("json", {})
headers = dict()
headers['Content-Type'] = 'application/json'
headers['Authorization'] = 'Bearer {}'.format(self.token)
try:
result = requests.post(self.req_url + path,
params=params,
data=data,
json=jn,
headers=headers,
verify=False)
return result
except Exception as e:
raise Exception("post请求错误: %s" % e)
def res(self, data):
if data.status_code == 200:
json_data = json.loads(data.text)
if json_data['code'] == 0:
return json_data['result']
else:
return json_data['message']
else:
return "系统内部出现了错误: %s" % self.portal_url
Test代码:
# demo 获取一个APP
app_get_path = '/app/get'
params = {'name': 'ABCD-EFG'}
res_data = req_client.res(req_client.get(app_get_path, params=params))
if isinstance(res_data, dict):
print(json.dumps(res_data, ensure_ascii=False, indent=4))
网友评论