美文网首页
Sentinel基于并发线程数流控验证

Sentinel基于并发线程数流控验证

作者: gzf_701 | 来源:发表于2020-07-27 17:58 被阅读0次

一、Sentinel基于并发线程数流控

采用基于线程数的限流模式后,我们不需要再显式地去进行线程池隔离,Sentinel 会控制访问该资源的线程数,超出的请求直接拒绝,直到堆积的线程处理完成。相当于是以资源为维度, 隔离出了每一种资源对应的不同线程数。
例如,当应用所依赖的下游应用由于某种原因导致服务不稳定、响应延迟增加,对于调用者来说,意味着吞吐量下降和更多的线程数占用,极端情况下甚至导致线程池耗尽。为应对太多线程占用的情况,业内有使用隔离的方案,比如通过不同业务逻辑使用不同线程池来隔离业务自身之间的资源争抢(线程池隔离)。这种隔离方案虽然隔离性比较好,但是代价就是线程数目太多,线程上下文切换的 overhead 比较大,特别是对低延时的调用有比较大的影响。Sentinel 并发线程数限流不负责创建和管理线程池,而是简单统计当前请求上下文的线程数目,如果超出阈值,新的请求会被立即拒绝,效果类似于信号量隔离。

二、代码以及事例验证过程

1、初始化限流规则, 设置访问该资源的最大的线程数为20, 拒绝策略使用RuleConstant.FLOW_GRADE_THREAD, 基于访问访问资源的线程数限流.


private static void initFlowRule() {
    List rules =new ArrayList();
    FlowRule rule1 =new FlowRule();
    rule1.setResource("methodA");
    // set limit concurrent thread for 'methodA' to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

2、同时启动100个线程, 访问资源, 刚开始时模拟methodB的业务耗时1s, 当运行60s后, methodB的业务耗时降低为20ms

public static void main(String[] args) throws Exception {
        System.out.println(
                "MethodA will call methodB. After running for a while, methodB becomes fast, "
                        + "which make methodA also become fast ");
        //设定时间,60s后将methodB的业务耗时修改为20ms
        tick();
        //设置线程数限流规则
        initFlowRule();
        //同时启动100个线程, 访问资源, 刚开始时模拟methodB的业务耗时1s, 当运行60s后, methodB的业务耗时降低为20ms
        for (int i = 0; i < threadCount; i++) {
            Thread entryThread = new Thread(() -> {
                while (true) {
                    Entry methodA = null;
                    try {
                        TimeUnit.MILLISECONDS.sleep(5);
                        methodA = SphU.entry("methodA");
                        activeThread.incrementAndGet();
                        Entry methodB = SphU.entry("methodB");
                        //模拟业务耗时
                        TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
                        methodB.exit();
                        pass.addAndGet(1);
                    } catch (BlockException e1) {
                        block.incrementAndGet();
                    } catch (Exception e2) {
                        // biz exception
                    } finally {
                        total.incrementAndGet();
                        if (methodA != null) {
                            methodA.exit();
                            activeThread.decrementAndGet();
                        }
                    }
                }
            });
            entryThread.setName("working thread");
            entryThread.start();
        }
    }

methodA = SphU.entry("methodA");对a进行了限流,而methodB只是单纯模拟业务耗时。
activeThread.incrementAndGet();是AtomicInteger的原子操作先加1再获取值。
pass.addAndGet(1);是通过的条数
block.incrementAndGet();是失败的条数
tick()方法开启了另一个工作线程是打印日志信息以及在60s后讲methodB的业务耗时将为20ms

static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;

            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " total qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                        + ", pass:" + oneSecondPass
                        + ", block:" + oneSecondBlock
                        + " activeThread:" + activeThread.get());
                if (seconds-- <= 0) {
                    stop = true;
                }
                if (seconds == 40) {
                    System.out.println("method B is running much faster; more requests are allowed to pass");
                    methodBRunningTime = 20;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                    + ", block:" + block.get());
            System.exit(0);
        }
    }
image.png

活跃线程数就是最初限流的20,pass数从20激增到800左右展示了methodB业务耗时20s之后通过访问资源的请求qps应该是显著增加的,可以看到, 在methodB业务耗时降低的前后, 存活的线程数activeThread大概都是20个左右, 在同样是20个线程的处理请求时, 很明显能看到在methodB业务耗时降低前后, pass由原来的20个左右, 上升到900左右.

image.png image.png

总结:

Sentinel基于并发线程数的流量控制, 能以资源为维度, 为不同的资源, 配置不用的线程数, 控制访问每个资源的线程数, 有点类似于信号量Semaphore方式, 信号量的资源数即为并发线程数, 比如当有请求需要访问被Sentinel保护的资源时, 会首先去获取信号量Semaphore中的资源, 只有成功从semaphore.acquire();获取资源, 才能访问应用资源.

相关文章

网友评论

      本文标题:Sentinel基于并发线程数流控验证

      本文链接:https://www.haomeiwen.com/subject/tdctrktx.html