美文网首页
读stressTester源码说并发压测

读stressTester源码说并发压测

作者: 靈08_1024 | 来源:发表于2019-05-27 18:53 被阅读0次

第一步 收集

上码:

public StressResult test(int concurrencyLevel, int totalRequests, StressTask stressTask, int warmUpTime) {
        if (stressTask == null) {
            stressTask = this.emptyTestService;
        }
//预执行,确保代码无误
        this.warmUp(warmUpTime, stressTask);
        int everyThreadCount = totalRequests / concurrencyLevel;
// 建立两个开关,一个管理开始时所有的数据预备到位,一个管理结束时所有的数据执行完毕
//开始的开关
        CyclicBarrier threadStartBarrier = new CyclicBarrier(concurrencyLevel);
//结束的开关
        CountDownLatch threadEndLatch = new CountDownLatch(concurrencyLevel);
//失败的次数统计
        AtomicInteger failedCounter = new AtomicInteger();
        StressContext stressContext = new StressContext();
        stressContext.setTestService(stressTask);
        stressContext.setEveryThreadCount(everyThreadCount);
        stressContext.setThreadStartBarrier(threadStartBarrier);
        stressContext.setThreadEndLatch(threadEndLatch);
        stressContext.setFailedCounter(failedCounter);
//设定线程池
        ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
        List<StressThreadWorker> workers = new ArrayList(concurrencyLevel);

        int realTotalRequests;
        StressThreadWorker worker;
//添加到队列和执行队列中的数据分开来写
//将数据添加到执行队列中去
        for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
            worker = new StressThreadWorker(stressContext, everyThreadCount);
            workers.add(worker);
        }
//将执行队列中的数据执行
        for(realTotalRequests = 0; realTotalRequests < concurrencyLevel; ++realTotalRequests) {
            worker = (StressThreadWorker)workers.get(realTotalRequests);
            executorService.submit(worker);
        }
//在此处等待线程池执行完毕
        try {
            threadEndLatch.await();
        } catch (InterruptedException var20) {
            log.error("InterruptedException", var20);
        }
//关闭线程池
        executorService.shutdownNow();
//真是请求的数量需要重新计算,避免上面有除不尽的情况,如并发99,总请求书1000
        realTotalRequests = everyThreadCount * concurrencyLevel;
        int failedRequests = failedCounter.get();
        StressResult stressResult = new StressResult();
        StressTester.SortResult sortResult = this.getSortedTimes(workers);
        List<Long> allTimes = sortResult.allTimes;
        stressResult.setAllTimes(allTimes);
        List<Long> trheadTimes = sortResult.trheadTimes;
        long totalTime = (Long)trheadTimes.get(trheadTimes.size() - 1);
        stressResult.setTestsTakenTime(totalTime);
        stressResult.setFailedRequests(failedRequests);
        stressResult.setTotalRequests(realTotalRequests);
        stressResult.setConcurrencyLevel(concurrencyLevel);
        stressResult.setWorkers(workers);
        return stressResult;
    }

下面贴一下StressThreadWorker的源码:


package com.taobao.stresstester.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StressThreadWorker implements Runnable {
    private StressTask service;
    private CyclicBarrier threadStartBarrier;
    private CountDownLatch threadEndLatch;
    private AtomicInteger failedCounter = null;
    private int count;
    protected static Logger log = LoggerFactory.getLogger(SimpleResultFormater.class);
    private List<Long> everyTimes;

    public StressThreadWorker(StressContext stressContext, int count) {
        this.threadStartBarrier = stressContext.getThreadStartBarrier();
        this.threadEndLatch = stressContext.getThreadEndLatch();
        this.failedCounter = stressContext.getFailedCounter();
        this.count = count;
        this.everyTimes = new ArrayList(count);
        this.service = stressContext.getTestService();
    }

    public List<Long> getEveryTimes() {
        return this.everyTimes;
    }

    public void run() {
        try {
//等数据就位
            this.threadStartBarrier.await();
            this.doRun();
        } catch (Exception var2) {
            log.error("Test exception", var2);
            var2.printStackTrace();
        }

    }

    protected void doRun() throws Exception {
//此处的count数是每个线程需要执行的数,即请求总数/并发线程总数
        for(int i = 0; i < this.count; ++i) {
            long start = System.nanoTime();

            try {
                this.service.doTask();
            } catch (Throwable var12) {
//失败次数统计,所有线程共享 失败次数
                this.failedCounter.incrementAndGet();
            } finally {
//在finally中进行接收数据
//每个work是一个新对象,所以统计的数据不会互相影响,最后的数据放在每个work中
                long var6 = System.nanoTime();
                long limit = var6 - start;
                this.everyTimes.add(limit);
            }
        }

        this.threadEndLatch.countDown();
    }
}

结果整理排序的类:

protected StressTester.SortResult getSortedTimes(List<StressThreadWorker> workers) {
        List<Long> allTimes = new ArrayList();
        List<Long> trheadTimes = new ArrayList();
        Iterator var5 = workers.iterator();

        while(var5.hasNext()) {
            StressThreadWorker worker = (StressThreadWorker)var5.next();
            List<Long> everyWorkerTimes = worker.getEveryTimes();
            long workerTotalTime = StatisticsUtils.getTotal(everyWorkerTimes);
            trheadTimes.add(workerTotalTime);
            Iterator var10 = everyWorkerTimes.iterator();

//将所有的每次执行的时间汇总
            while(var10.hasNext()) {
                Long time = (Long)var10.next();
                allTimes.add(time);
            }
        }
//此处按照大小排序,在后面取百分比多少的运行时间时,直接按照索引取数值
        Collections.sort(allTimes);
        Collections.sort(trheadTimes);
        StressTester.SortResult result = new StressTester.SortResult();
        result.allTimes = allTimes;
        result.trheadTimes = trheadTimes;
        return result;
    }

    class SortResult {
        List<Long> allTimes;
        List<Long> trheadTimes;

        SortResult() {
        }
    }

第二步 格式化输出

public void format(StressResult stressResult, Writer writer) {
//测试总耗时
        long testsTakenTime = stressResult.getTestsTakenTime();
//测试总请求数
        int totalRequests = stressResult.getTotalRequests();
//测试并发线程数
        int concurrencyLevel = stressResult.getConcurrencyLevel();
//纳秒转毫秒
        float takes = StatisticsUtils.toMs(testsTakenTime);
//此处存放的是所有请求的时间集合
        List<Long> allTimes = stressResult.getAllTimes();
//聚合所有时间,求和
        long totaleTimes = StatisticsUtils.getTotal(allTimes);
//TPS(每秒传输的请求)=总线程数*总请求数/总耗时     最后再转换为秒级别
        float tps = 1.0E9F * (float)concurrencyLevel * ((float)totalRequests / (float)totaleTimes);
//计算平均时间(此处是纳秒)
        float averageTime = StatisticsUtils.getAverage(totaleTimes, totalRequests);
//每个线程的平均时间
        float onTheadAverageTime = averageTime / (float)concurrencyLevel;
//计算指定百分比对应索引,直接获取索引对应的数字,即百分比多少的数据是在多少纳秒之前执行的
        int count_50 = totalRequests / 2;
        int count_66 = totalRequests * 66 / 100;
        int count_75 = totalRequests * 75 / 100;
        int count_80 = totalRequests * 80 / 100;
        int count_90 = totalRequests * 90 / 100;
        int count_95 = totalRequests * 95 / 100;
        int count_98 = totalRequests * 98 / 100;
        int count_99 = totalRequests * 99 / 100;
        long longestRequest = (Long)allTimes.get(allTimes.size() - 1);
        long shortestRequest = (Long)allTimes.get(0);
        StringBuilder view = new StringBuilder();
        view.append(" Concurrency Level:\t").append(concurrencyLevel).append("--并发数");
        view.append("\r\n Time taken for tests:\t").append(takes).append(" ms").append("--测试耗时");
        view.append("\r\n Complete Requests:\t").append(totalRequests).append("--完成测试次数");
        view.append("\r\n Failed Requests:\t").append(stressResult.getFailedRequests()).append("--失败次数");
        view.append("\r\n Requests per second:\t").append(tps).append("--QPS");
        view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(averageTime)).append(" ms").append("--平均耗时");
        view.append("\r\n Time per request:\t").append(StatisticsUtils.toMs(onTheadAverageTime)).append(" ms (across all concurrent requests)").append("--平均耗时,忽略并发影响");
        view.append("\r\n Shortest request:\t").append(StatisticsUtils.toMs(shortestRequest)).append(" ms").append("--最短耗时");
        view.append("\r\n Percentage of the requests served within a certain time (ms)");
        view.append("\r\n  50%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_50))).append("--50% 的耗时在0.005703毫秒以下");
        view.append("\r\n  66%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_66)));
        view.append("\r\n  75%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_75)));
        view.append("\r\n  80%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_80)));
        view.append("\r\n  90%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_90)));
        view.append("\r\n  95%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_95)));
        view.append("\r\n  98%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_98)));
        view.append("\r\n  99%\t").append(StatisticsUtils.toMs((Long)allTimes.get(count_99)));
        view.append("\r\n 100%\t").append(StatisticsUtils.toMs(longestRequest)).append(" (longest request)").append("--最长的耗时");

        try {
            writer.write(view.toString());
        } catch (IOException var29) {
            log.error("IOException:", var29);
        }

    }
TPS和QPS的区别

TPS(transcations per second)每秒传输的请求总数。系统级别指标。
QPS(queries per second)每秒查询数。单台服务器级别指标。

相关文章

网友评论

      本文标题:读stressTester源码说并发压测

      本文链接:https://www.haomeiwen.com/subject/lgnhtctx.html