-
一个完整业务流程的测试,一些动态数据基本都要进行参数化,比如虚拟用户的登录账号,或者随机访问商品详情的商品id
from locust import HttpUser,TaskSet,between,task import os,sys curPath = os.path.abspath(os.path.dirname(__file__)) rootPath = os.path.split(curPath)[0] PathProject = os.path.split(rootPath)[0] sys.path.append(rootPath) sys.path.append(PathProject) n = 0 class UserBehavior(TaskSet): def get_account(self): global n mobiles = ['1300000000','1300000001','1300000002','1300000003'] n += 1 #数据循环使用 if n>=len(mobiles): n = 0 return mobiles[n] @task(4) def test_login(self): """ 登录用户 :return: """ mobile = self.get_account() url = '/login' param={'mobile':mobile} with self.client.post(url,json=param,headers={},catch_response = True) as response: print("成功登录账号",mobile) class WebsiteUser(HttpUser): host = 'http://127.0.0.1' tasks = [UserBehavior] wait_time = between(1, 2) if __name__ == '__main__': os.system("locust -f ccc.py")
-
使用队列进行参数化
一、
from locust import HttpUser,TaskSet,between,task
import os,sys
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
PathProject = os.path.split(rootPath)[0]
sys.path.append(rootPath)
sys.path.append(PathProject)
import queue
n = 0
class UserBehavior(TaskSet):
@task(4)
def test_login(self):
"""
登录用户
:return:
"""
try:
mobile = self.parent.queueData.get() # 获取队列里的数据
url = '/login'
param = {'mobile': mobile}
with self.client.post(url, json=param, headers={}, catch_response=True) as response:
print("成功登录账号", mobile)
except queue.Empty:
# 队列取空后,直接退出
print('no data exist')
class WebsiteUser(HttpUser):
host = 'http://127.0.0.1'
tasks = [UserBehavior]
wait_time = between(1, 2)
queueData = queue.Queue() # 队列实例化
for count in range(0,5): # 循环数据生成
data = {
"username": str(13000000000 + count)
}
queueData.put_nowait(data)
if __name__ == '__main__':
os.system("locust -f ccc.py")
二:
上面队列的例子,参数化最多数据是5个,如果在并发参数设置为100个用户的时候,最多并发还是只有5个
我们可以进行数据循环使用
from locust import HttpUser,TaskSet,between,task
import os,sys
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
PathProject = os.path.split(rootPath)[0]
sys.path.append(rootPath)
sys.path.append(PathProject)
import queue
n = 0
class UserBehavior(TaskSet):
@task(4)
def test_login(self):
"""
登录用户
:return:
"""
try:
mobile = self.parent.queueData.get() # 获取队列里的数据
self.parent.queueData.put_nowait(mobile) # 再将取出的数据插入队尾,对数据进行循环使用
url = '/login'
param = {'mobile': mobile}
with self.client.post(url, json=param, headers={}, catch_response=True) as response:
print("成功登录账号", mobile)
except queue.Empty:
# 队列取空后,直接退出
print('no data exist')
class WebsiteUser(HttpUser):
host = 'http://127.0.0.1'
tasks = [UserBehavior]
wait_time = between(1, 2)
queueData = queue.Queue() # 队列实例化
for count in range(0,5): # 循环数据生成
queueData.put_nowait(str(13000000000 + count))
if __name__ == '__main__':
os.system("locust -f ccc.py")
网友评论