Locust 介绍
它采用纯 Python 实现,是一个分布式用户负载测试的工具。 使用基于 Requests 库的客户端发起请求,使编写脚本大大简化; 在模拟并发方面摒弃进程和线程,完全基于时间驱动,采用协程(gevent)提供的非阻塞 IO 和 coroutine 来实现网络层的并发请求。因此单台压力机也能产生数千并发请求数。
特点:
- 基于 python 的 Requests 和 Gevent , 脚本编写简单易读
- c/s 架构有一个简洁的用户界面,实时显示相关测试结果
- 支持分布式测试, 能跨平台易于扩展
安装及使用
安装
支持Python版本: 3.6, 3.7, 3.8
安装方式: pip install locust -i https://pypi.tuna.tsinghua.edu.cn/simple
案例
- locust 中通过 HttpUser 来表示一个用户,我们需要自定类继承自 HttpUser
- HttpUser 的 client 属性可以模拟客户端请求,支持 get,post 等待方法,并且自动保持 session。(实际上他就是 requests 包的 session 对象)
-- client 的 get,post 等方法传递 url 地址时,不需要携带 http://www.example.com 这个前缀,只需要提供路径就行 - wait_time 下一个任务执行之前等待的时间,用于模拟用户的思考时间,
- host 表示服务器的地址,例如: http://www.baidu.com
通过 @task 装饰一个方法,这个方法表示一个用户可能执行的动作,一个用户会有多种动作;
--- 一个用户下一步具体要执行什么动作,是随机的,也就是 locust 会从动作列表中随机选择一个
--- @task 接受一个参数,表示权重,数值越大,这个动作被选中的概率就越高
- on_start 测试之前执行的操作
- on_stop 测试结束执行的操作
locustfile.py
from locust import HttpUser, task, between
# 相当于模拟一个用户
class WebUser(HttpUser):
# 服务器的地址
host = 'http://localhost:8000'
# 下一个任务执行之前等待的时间,用于模式用户的思考时间
wait_time = between(3, 5) # 这里使用随机 3,5 秒钟
# 定义一个测试任务,类似于一个取样器
@task(1)
def info(self):
# 获取用户信息
self.client.get('/info/')
@task(10)
def addresses(self):
# 获取用户地址
self.client.get('/addresses/')
def on_start(self):
# 测试之前执行的操作
print('用户登录')
auth = {'username': 'root', 'password': 'root1234'}
self.client.post('/login/', json=auth)
def on_stop(self):
# 测试结束执行的操作
print('用户退出')
self.client.delete('/logout/')
启动locust
- 在当前目录下, 可以直接运行, 或者在终端下使用命令运行:
locust
他会自动找到名为locustfile.py
的文件 - 为了子目录下或者不同级别目录下, 可以使用-f 命令启动
locust -f xxx/locustfile.py
然后打开浏览器, 直接在浏览器输入http://localhost:8089打开UI界面, 界面如下:
locust 无界面测试
locust -f locustfile.py --headless -u 100 -r 10 -t 10s
- --headless 表示不启动界面
- -u [数量] 设置虚拟用户数
- -r [时间] 每秒钟启动虚拟用户数
- -t [时间] 设置测试多长时间
----秒,数字后面加上 s ,例如 10s
----分钟,数字后面加上 m,例如 1m
----小时,数字后面加上 h,例如 1h
locust 分布式
运行多个进程的Locust, 使用 --master
命令启动主进程, 使用 --worker
启动从属进程
主进程控制从进程并收集从进程的测试结果;从进程负责执行测试,并反馈结果给主进程。
启动无界面主进程
locust -f locustfile.py --master --headless --master-bind-host=127.0.0.1 --master-bind-port=8090 -t 15s -u 5 -r 1
-
--master
指明当前为主进程 -
--master-bind-host=[ip 地址]
指明主进程绑定的地址 -
--master-bind-port=[端口号]
指明主进程的绑定的端口号
启动后,如果当前没有从进程的话会显示等待从进程。当从进程启动后便可以执行脚本了。
启动无界面的从进程:
locust -f locustfile.py --worker --master-host=127.0.0.1 --master-port=8090 --headless
-
--worker
指明当前为从进程 -
--master-host=[ip 地址]
指明需要连接的主进程的 ip 地址 -
--master-port=[端口号]
指明需要连接的主进程的端口
从进程启动后将开始执行测试,主进程将会统计结果。
动作集合 TaskSet
对于 web 网站来说,通常情况下,网站的一个页面中用户能够执行多个动作,并且这些动作通常是为了某个特定的目标,比如注册页面,用户可以执行的动作: 验证用户名,手机号码是否重复,更新图形验证码,获取手机验证码等。这些动作的目标就是为了完成用户注册。我们可以把这些动作组合在一起。
在 locust 中我们通过 TaskSet 类来组合多个动作,然后在 HttpUser 对象中引入 taskset ,引入方式是在 tasks 属性列表中,添加 taskset 类。
taskset 也是可以嵌套的,我们在 taskset 的 tasks 属性中添加要嵌套的其他 taskset 类。类似于: 网站首页包含用户登录页面,用户登录页面包含用户注册页面这种关系
- 注意: taskset 相当于一个特殊的动作,如果 locust 进入到 taskset 中执行其中的动作,那么 locust 会一直执行 taskset 中的动作,那么 WebUser 中定义的动作就不会被执行。 需要在 taskset 中调用 self.ingterrupt() 来退出,一般可以定义一个动作,在这个动作中调用 self.interrupt()
import uuid
from locust import HttpUser, task, between, TaskSet
# 注册
class RegistTaskSet(TaskSet):
# 用户名是否重复
@task
def username_count(self):
username = 'zhangsan'
self.client.get(f'/usernames/{username}/count/')
# 手机号是否重复
@task
def mobile_count(self):
mobile = '13712345678'
self.client.get(f'/mobiles/{mobile}/count/')
# 获取图形验证码
@task
def image_codes(self):
image_code_id = str(uuid.uuid4())
self.client.get(f'/image_codes/{image_code_id}/')
@task
def stop(self):
print('退出当前动作集合')
self.interrupt()
# 登录
class UserLoginTaskSet(TaskSet):
# 嵌套用户注册
tasks = [RegistTaskSet]
@task
def username_login(self):
auth = {'username': 'root', 'password': 'root1234'}
self.client.post('/login/', json=auth)
@task
def stop(self):
print('退出当前动作集合')
self.interrupt()
# 模拟用户
class WebUser(HttpUser):
# 包含用户登录
tasks = [UserLoginTaskSet]
# 服务器的地址
host = 'http://localhost:8000'
# 下一个任务执行之前等待的时间,用于模式用户的思考时间
wait_time = between(3, 5) # 这里使用随机 3,5 秒钟
@task
def search(self):
# 搜索
self.client.get('/search/')
有序动作集合 SequentialTaskSet
我们在 HttpUser 以及 TaskSet 下定义动作后,locust 是从这些动作中随机选择一个动作来执行,有时候我们希望用户执行的动作是有序的,比如:
1.验证用户名是否重复
2.验证手机号码是否重复
3.验证获取图形验证码
4.获取手机验证码
5.注册
这时候就需要通过 SequentialTaskSet 来定义动作集合,他会按照定义 task 定义的顺序来一次调用。
- 使用的方式和 TaskSet 一样,但是 task 装饰器的权重无效了
from locust import HttpUser, task, between, SequentialTaskSet
class RegistSeqTaskSet(SequentialTaskSet):
# 用户名是否重复
@task
def username_count(self):
print('验证用户名')
username = 'zhangsan'
self.client.get(f'/usernames/{username}/count/')
# 手机号是否重复
@task
def mobile_count(self):
print('验证手机号')
mobile = '13712345678'
self.client.get(f'/mobiles/{mobile}/count/')
# 获取图形验证码
@task
def image_codes(self):
print('获取图形验证码')
image_code_id = str(uuid.uuid4())
self.client.get(f'/image_codes/{image_code_id}/')
# 模拟用户
class WebUser(HttpUser):
# 包含用户登录
tasks = [RegistSeqTaskSet]
# 服务器的地址
host = 'http://localhost:8000'
# 下一个任务执行之前等待的时间,用于模式用户的思考时间
wait_time = between(3, 5) # 这里使用随机 3,5 秒钟
断言
我们需要判断一个请求是失败还是成功,需要给请求参数携带 catch_response=True ,这样请求方法就会返回一个上下文管理器
这个上下文管理器返回的是响应对象,我们通过调用响应对象的 failure(消息) 来标记本次请求失败,
通过 catch_response=True 参数来进行断言。
from locust import HttpUser, task, between
class WebUser(HttpUser):
wait_time = between(1, 5)
host = 'http://localhost:8000'
@task
def info(self):
with self.client.get('/info/', catch_response=True) as resp:
res = resp.json()
if res['code'] != 0:
resp.failure(res['errmsg'])
网友评论