性能测试
为了保证系统在线上稳定运行,需要对系统做性能测试,看是否可抗住线上流量。
性能测试是一个总称,具体可以细分为 性能测试、负载测试、压力测试和稳定性测试。
- 性能测试
测试 在正常的访问范围之内,是否可以达到性能预期。 - 负载测试
在对系统不断增加并发,直到某项资源达到安全临界值(饱和状态),即测试性能最高的点的位置。 - 压力测试
继续加压,系统性能快速下降,最终奔溃,即测试系统的崩溃点在哪里。 - 稳定性测试
稳定性测试算是性能测试的一种,即测试长时间的高并发的压力下,系统是否和原来一样稳定。
可以看出 性能测试、负载测试、压力测试是连续的三个阶段。
性能测试曲线
一般系统性能呈现如下趋势:
a->b阶段为性能测试,一般数据会呈线性增长。到达b后开始进入负载测试阶段,再增加并发数,性能不在显著增加,一直到c。c点系统到达了饱和,再施加压力,系统性能会迅速下降,达到d点,即系统崩溃不可用。
与上图对应,系统是响应时间变化如下:
a->b响应时间不变。b->c 响应时间增加。其实,b点是正常运行的点,c点本身就是崩溃点,维持一段时间,就很可能奔溃。到d点系统崩溃,不可用。
总之,性能测试就是不断增加并发,看TPS值。
性能测试小工具
写一个小程序,可以测试某一url的性能。并发数量和总发送的包数可以自己控制。返回响应的平均时间和95%响应时间。
创建文件performance_test.py,编写程序如下:
#/usr/bin python
# -*- encoding: utf-8 -*-
import argparse
import multiprocessing
import urllib2
import time
import logging
import conf
def performance_test(url, concurrency, max_request):
"""main"""
manager = multiprocessing.Manager()
request_num = manager.Value("d",0)
resp_time=manager.list()
process_pool = []
# start performance test
# request_process(url, request_num, max_request, resp_time)
for num in xrange(0, concurrency):
p = multiprocessing.Process(target=request_process, args=(url, request_num, max_request, resp_time,))
process_pool.append(p)
for p in process_pool:
p.start()
for p in process_pool:
p.join()
# p.close()
print len(resp_time)
resp_time_info = resp_time
resp_time_info.sort()
average = sum(resp_time_info) / len(resp_time_info)
resp_95 = resp_time_info[int(0.95 * len(resp_time_info))]
return average, resp_95
def request_process(url, request_num, max_request, resp_time):
"""get url until request_num large or equal max_request"""
error_num = 0
while request_num.value <= max_request:
request_num.value += 1
start_time = time.time()
header = {}
try:
req = urllib2.Request(url)
resp = urllib2.urlopen(req, timeout=conf.TIMEOUT)
if resp.code != 200:
logging.warn("get url %s error, resp_code: %s" % (url, resp.code))
error_num += 1
request_num.value -= 1
else:
logging.debug("get url %s success" % url)
error_num = 0
resp_time.append(time.time() - start_time)
except Exception as err:
logging.warn("get url %s error: %s" % (url, err))
error_num += 1
request_num.value -= 1
if error_num > 3:
logging.error("too much error...")
break
def get_argparse():
"""get parse input args"""
parser = argparse.ArgumentParser(description='Performance test tool. '
'use like: python performance_test.py -c 10 -n 100 -u http://www.baidu.com')
parser.add_argument('-c', '--concurrency', dest='concurrency', type=int, default=conf.CONCORRENCY,
help='Number of multiple request to make at a time')
parser.add_argument('-n', '--requests', dest='requests', type=int, default=conf.MAX_REQUESTS,
help='Number of requests to perform')
parser.add_argument('-u', '--url', dest='urls', type=str, required=True,
help='url to request')
args = parser.parse_args()
return args
if __name__ == "__main__":
args = get_argparse()
concurrency = args.concurrency
max_request = args.requests
urls = args.urls
print performance_test(urls, concurrency, max_request)
通过启动的进程数量控制并发。通过进程间共享变量实现总发送数据量的控制(由于没有严格的锁,因此,时间发送的数据会和设置的有一点出入)。
运行:
python performance_test.py -c 10 -n 100 -u http://www.baidu.com
得到结果:
(0.26014810571303737, 0.3801710605621338)
网友评论