第一步 收集
上码:
public StressResult test(int concurrencyLevel, int totalRequests, StressTask stressTask, int warmUpTime) {
if (stressTask == null) {
stressTask = this.emptyTestService;
}
//预执行,确保代码无误
this.warmUp(warmUpTime, stressTask);
int everyThreadCount = totalRequests / concurrencyLevel;
// 建立两个开关,一个管理开始时所有的数据预备到位,一个管理结束时所有的数据执行完毕
//开始的开关
CyclicBarrier threadStartBarrier = new CyclicBarrier(concurrencyLevel);
//结束的开关
CountDownLatch threadEndLatch = new CountDownLatch(concurrencyLevel);
//失败的次数统计
AtomicInteger failedCounter = new AtomicInteger();
StressContext stressContext = new StressContext();
stressContext.setTestService(stressTask);
stressContext.setEveryThreadCount(everyThreadCount);
stressContext.setThreadStartBarrier(threadStartBarrier);
stressContext.setThreadEndLatch(threadEndLatch);
stressContext.setFailedCounter(failedCounter);
//设定线程池
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
List<StressThreadWorker> workers = new ArrayList(concurrencyLevel);
int realTotalRequests;
StressThreadWorker worker;
//添加到队列和执行队列中的数据分开来写
//将数据添加到执行队列中去
for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
worker = new StressThreadWorker(stressContext, everyThreadCount);
workers.add(worker);
}
//将执行队列中的数据执行
for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
worker = (StressThreadWorker)workers.get(realTotalRequests);
executorService.submit(worker);
}
//在此处等待线程池执行完毕
try {
threadEndLatch.await();
} catch (InterruptedException var20) {
log.error("InterruptedException", var20);
}
//关闭线程池
executorService.shutdownNow();
//真是请求的数量需要重新计算,避免上面有除不尽的情况,如并发99,总请求书1000
realTotalRequests = everyThreadCount * concurrencyLevel;
int failedRequests = failedCounter.get();
StressResult stressResult = new StressResult();
StressTester.SortResult sortResult = this.getSortedTimes(workers);
List<Long> allTimes = sortResult.allTimes;
stressResult.setAllTimes(allTimes);
List<Long> trheadTimes = sortResult.trheadTimes;
long totalTime = (Long)trheadTimes.get(trheadTimes.size() - 1);
stressResult.setTestsTakenTime(totalTime);
stressResult.setFailedRequests(failedRequests);
stressResult.setTotalRequests(realTotalRequests);
stressResult.setConcurrencyLevel(concurrencyLevel);
stressResult.setWorkers(workers);
return stressResult;
}
下面贴一下StressThreadWorker
的源码:
package com.taobao.stresstester.core;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class StressThreadWorker implements Runnable {
private StressTask service;
private CyclicBarrier threadStartBarrier;
private CountDownLatch threadEndLatch;
private AtomicInteger failedCounter = null;
private int count;
protected static Logger log = LoggerFactory.getLogger(SimpleResultFormater.class);
private List<Long> everyTimes;
public StressThreadWorker(StressContext stressContext, int count) {
this.threadStartBarrier = stressContext.getThreadStartBarrier();
this.threadEndLatch = stressContext.getThreadEndLatch();
this.failedCounter = stressContext.getFailedCounter();
this.count = count;
this.everyTimes = new ArrayList(count);
this.service = stressContext.getTestService();
}
public List<Long> getEveryTimes() {
return this.everyTimes;
}
public void run() {
try {
//等数据就位
this.threadStartBarrier.await();
this.doRun();
} catch (Exception var2) {
log.error("Test exception", var2);
var2.printStackTrace();
}
}
protected void doRun() throws Exception {
//此处的count数是每个线程需要执行的数,即请求总数/并发线程总数
for(int i = 0; i < this.count; ++i) {
long start = System.nanoTime();
try {
this.service.doTask();
} catch (Throwable var12) {
//失败次数统计,所有线程共享 失败次数
this.failedCounter.incrementAndGet();
} finally {
//在finally中进行接收数据
//每个work是一个新对象,所以统计的数据不会互相影响,最后的数据放在每个work中
long var6 = System.nanoTime();
long limit = var6 - start;
this.everyTimes.add(limit);
}
}
this.threadEndLatch.countDown();
}
}
结果整理排序的类:
protected StressTester.SortResult getSortedTimes(List<StressThreadWorker> workers) {
List<Long> allTimes = new ArrayList();
List<Long> trheadTimes = new ArrayList();
Iterator var5 = workers.iterator();
while(var5.hasNext()) {
StressThreadWorker worker = (StressThreadWorker)var5.next();
List<Long> everyWorkerTimes = worker.getEveryTimes();
long workerTotalTime = StatisticsUtils.getTotal(everyWorkerTimes);
trheadTimes.add(workerTotalTime);
Iterator var10 = everyWorkerTimes.iterator();
//将所有的每次执行的时间汇总
while(var10.hasNext()) {
Long time = (Long)var10.next();
allTimes.add(time);
}
}
//此处按照大小排序,在后面取百分比多少的运行时间时,直接按照索引取数值
Collections.sort(allTimes);
Collections.sort(trheadTimes);
StressTester.SortResult result = new StressTester.SortResult();
result.allTimes = allTimes;
result.trheadTimes = trheadTimes;
return result;
}
class SortResult {
List<Long> allTimes;
List<Long> trheadTimes;
SortResult() {
}
}
第二步 格式化输出
public void format(StressResult stressResult, Writer writer) {
//测试总耗时
long testsTakenTime = stressResult.getTestsTakenTime();
//测试总请求数
int totalRequests = stressResult.getTotalRequests();
//测试并发线程数
int concurrencyLevel = stressResult.getConcurrencyLevel();
//纳秒转毫秒
float takes = StatisticsUtils.toMs(testsTakenTime);
//此处存放的是所有请求的时间集合
List<Long> allTimes = stressResult.getAllTimes();
//聚合所有时间,求和
long totaleTimes = StatisticsUtils.getTotal(allTimes);
//TPS(每秒传输的请求)=总线程数*总请求数/总耗时 最后再转换为秒级别
float tps = 1.0E9F * (float)concurrencyLevel * ((float)totalRequests / (float)totaleTimes);
//计算平均时间(此处是纳秒)
float averageTime = StatisticsUtils.getAverage(totaleTimes, totalRequests);
//每个线程的平均时间
float onTheadAverageTime = averageTime / (float)concurrencyLevel;
//计算指定百分比对应索引,直接获取索引对应的数字,即百分比多少的数据是在多少纳秒之前执行的
int count_50 = totalRequests / 2;
int count_66 = totalRequests * 66 / 100;
int count_75 = totalRequests * 75 / 100;
int count_80 = totalRequests * 80 / 100;
int count_90 = totalRequests * 90 / 100;
int count_95 = totalRequests * 95 / 100;
int count_98 = totalRequests * 98 / 100;
int count_99 = totalRequests * 99 / 100;
long longestRequest = (Long)allTimes.get(allTimes.size() - 1);
long shortestRequest = (Long)allTimes.get(0);
StringBuilder view = new StringBuilder();
view.append(" Concurrency Level:\t").append(concurrencyLevel).append("--并发数");
view.append("\r\n Time taken for tests:\t").append(takes).append(" ms").append("--测试耗时");
view.append("\r\n Complete Requests:\t").append(totalRequests).append("--完成测试次数");
view.append("\r\n Failed Requests:\t").append(stressResult.getFailedRequests()).append("--失败次数");
view.append("\r\n Requests per second:\t").append(tps).append("--QPS");
view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(averageTime)).append(" ms").append("--平均耗时");
view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(onTheadAverageTime)).append(" ms (across all concurrent requests)").append("--平均耗时,忽略并发影响");
view.append("\r\n Shortest request:\t").append(StatisticsUtils.toMs(shortestRequest)).append(" ms").append("--最短耗时");
view.append("\r\n Percentage of the requests served within a certain time (ms)");
view.append("\r\n 50%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_50))).append("--50% 的耗时在0.005703毫秒以下");
view.append("\r\n 66%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_66)));
view.append("\r\n 75%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_75)));
view.append("\r\n 80%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_80)));
view.append("\r\n 90%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_90)));
view.append("\r\n 95%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_95)));
view.append("\r\n 98%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_98)));
view.append("\r\n 99%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_99)));
view.append("\r\n 100%\t").append(StatisticsUtils.toMs(longestRequest)).append(" (longest request)").append("--最长的耗时");
try {
writer.write(view.toString());
} catch (IOException var29) {
log.error("IOException:", var29);
}
}
TPS和QPS的区别
TPS(transcations per second)每秒传输的请求总数。系统级别指标。
QPS(queries per second)每秒查询数。单台服务器级别指标。
网友评论