示例1 locustfile.py 和 run_test.py
#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date : 2018-04-15 09:00:00
# @Author : Canon
# @Link : https://www.python.org
# @Version : 3.6.1
from locust import HttpLocust
from locust import TaskSet
from locust import task
class BlogDemo(TaskSet):
'''用户行为:打开我的博客首页demo'''
@task(1)
def open_blog(self):
# 定义requests的请求头
header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}
r = self.client.get("/s?ie=UTF-8&wd=python", headers=header, verify=False)
print(r.status_code)
assert r.status_code == 200
class websitUser(HttpLocust):
task_set = BlogDemo
min_wait = 3000 # 单位毫秒
max_wait = 6000 # 单位毫秒
#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date : 2018-04-15 09:00:00
# @Author : Canon
# @Link : https://www.python.org
# @Version : 3.6.1
if __name__ == "__main__":
import os
os.system("locust -f E:\PythonCase\PyScript\LocustTest\Example\locustfile.py --host=https://www.baidu.com")
示例2 locustfile.py 和 run_locust.py
#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date : 2018-04-15 09:00:00
# @Author : Canon
# @Link : https://www.python.org
# @Version : 3.6.1
import time
from locust import Locust, TaskSet, events, task
import sys
import os
import requests
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
class test_demo(object):
"""接口测试demo"""
def __init__(self, ):
pass
@staticmethod
def baidu():
url = "https://www.baidu.com/s?ie=UTF-8&wd=python"
response = requests.get(url)
response.raise_for_status()
class HttpClient(object):
def __init__(self):
pass
def test_demo_test_httpbin_get(self):
test_demo().baidu()
def test_demo_test_httpbin_post(self):
pass
def test_demo_test_webservice(self):
pass
class HttpLocust(Locust):
def __init__(self, *args, **kwargs):
super(HttpLocust, self).__init__(*args, **kwargs)
self.client = HttpClient()
class ApiUser(HttpLocust):
min_wait = 10
max_wait = 100
class task_set(TaskSet):
# @task(1)
# def test_demo_test_webservice(self):
# self.client.test_demo_test_webservice()
# @task(1)
# def test_demo_test_httpbin_post(self):
# self.client.test_demo_test_httpbin_post()
@task(2)
def test_demo_httpbin_get(self):
self.client.test_demo_test_httpbin_get()
#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date : 2018-04-15 09:00:00
# @Author : Canon
# @Link : https://www.python.org
# @Version : 3.6.1
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
import multiprocessing
"""
Please Visit http://localhost:8089/
"""
def start_slave(locust_file, master_port):
print('start slave pid:\t{0}'.format(os.getpid()))
os.system('locust -f {0} --slave --master-port {1}'.format(locust_file, master_port))
if __name__ == '__main__':
multiprocessing.freeze_support()
# locustfile.py
locust_file = "E:\PythonCase\PyScript\LocustTest\Example\locustfile.py"
# locust 运行命令
locust_command = '--no-web -c 10 -r 2 --run-time 5m'
# 运行locust
if 'master' in locust_command:
# 分布式
num = 2
master_port = 5557
record = []
for i in range(num):
process = multiprocessing.Process(target=start_slave, args=(locust_file, master_port))
process.start()
record.append(process)
print('start master pid:\t{0}'.format(os.getpid()))
cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
print('cmd:\t{0}'.format(cmd))
os.system(cmd)
else:
# 单例模式
cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
print('cmd:\t{0}'.format(cmd))
os.system(cmd)
网友评论