HttpUser类
使用此类时,表示进行压力测试的Http虚拟用户,每个用户都获得一个client
属性,可用于发出Http请求,使用get
、post
、put
、delete
等方法。在该类下,可以直接利用@task
装饰器声明任务,也可由tasks
属性定义。
@task装饰器
它在HttpUser
类中起到声明任务的作用,可选择设置权重,从而控制执行率。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task(10)
def one(self):
self.client.get("/one")
@task
def two(self):
self.client.get("/two")
上述脚本表示执行one的概率是two的10倍
@tag装饰器
@tag
装饰器用于标识在运行压力测试时执行什么任务。例如下方脚本中,启动测试时应用--tags test
参数,则仅会执行test
任务,不会执行test2
任务。
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@tag('test')
@task
def one(self):
self.client.get("/one")
@tag('test2')
@task(2)
def two(self):
self.client.get("/two")
on_start/on_stop
开始/停止执行所在任务列表时调用。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
def on_start(self):
self.client.post("/about",{"username":"name","password":"password"})
print("1111111")
def on_stop(self):
print("2222222")
@task
def test(self):
print("55555555")
上述脚本表示,开始运行时直接执行on_start任务,进行过程中执行test任务,结束任务时执行on_stop任务。可用于基于登录操作的流程测试。
weight
在一个文件中,建立多个HttpUser
类时,weight
代表执行该类下测试的可能性,数值越大,执行时被选择的可能性越高。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
weight=3
...
class startUser2(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
weight=1
执行startUser1的概率是startUser2的3倍。
tasks
tasks
属性可以是列表,每次随机选择任务;也可以是字典,按设置的比例执行任务。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def one(self):
self.client.get("/one")
def two(self):
self.client.get("/two")
tasks=[one,two]
#tasks ={one:3,two:1}
#one及two代表已建立TaskSet任务实例,执行可能性是two的3倍
TaskSet类
目前可在HttpUser
中直接定义任务,也可在TaskSet
中定义压力测试中用户执行的任务。
注意如果是继承TaskSet
定义任务,则需要引入User
类,而后在启动执行User
类或HttpUser
类,否则只是声明了任务,执行会报错No User Class Found
。
开始运行时,会从tasks
属性中选择一个任务,休眠wait_time
定义的时间长度后,执行下个任务。
interrupt(reschedule=True)
TaskSet
可以嵌套,在tasks
属性中可以包含另一个TaskSet
,如果需要中断,则需要在子TaskSet
中调用interrupt
,中断任务并将执行控制移交给父TaskSet
。
如果reschedule
为True
(默认),则父TaskSet
将立即重新安排并执行新任务。
from locust import HttpUser,task,TaskSet,constant
class startUser1(TaskSet):
wait_time = constant(1)
@task
def onetask(self):
self.client.get("/")
print("55555555")
@task
def stop(self):
self.interrupt()
class startUser2(HttpUser):
host='https://www.baidu.com'
wait_time = constant(1)
tasks = [startUser1]
@task
def twotask(self):
self.client.get("/")
print("33333")
SequentialTaskSet类
SequentialTaskSet
类其实是TaskSet
类,但该类下的任务是按顺序执行。
from locust import User,task,SequentialTaskSet,constant
class startUser1(SequentialTaskSet):
wait_time = constant(1)
@task
def fist_task(self):
print("1")
@task
def second_task(self):
print("2")
@task
def third_task(self):
print("3")
class startUser2(User):
wait_time = constant(1)
tasks=[startUser1]
#执行任务时,会依次打印出1、2、3
手动控制请求成功or失败
通过使用catch_response
参数(catch_response
值为布尔类型,如果设置为True
, 允许该请求被标记为成功或失败)和 with
语句,设定请求失败or成功条件。
标记失败
虽然有时候最终接口访问正常,但仍可设计一些条件,将其标记为失败。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def one(self):
with self.client.get("/",catch_response=True) as response:
if response.elapsed.total_seconds() > 0.01:
response.failure("Request took too long")
请求时长大于0.01秒,标记失败
elapsed
表示从发送请求到响应到达之间经过的时间
标记成功
有的时候虽然返回4**的状态码,但是是符合预期的,这个时候就需要我们主动将其标记为成功。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def fifth_task(self):
with self.client.get("/",catch_response=True) as response:
if response.status_code==404:
response.success()
明明是之前整理的知识点,但是重新写一次仍然用了很久....甚至比新写的用的时间还久....再次复习了一次...但是自己已经写毛了....
自我记录,有错误欢迎指正~
网友评论