美文网首页
熔断器设计

熔断器设计

作者: 放开那个BUG | 来源:发表于2021-01-10 15:21 被阅读0次

    1、前言

    看别人 RPC 框架代码有熔断器的代码,但是对于熔断器并不是很了解,于是了解一下熔断器设计。熔断器跟限流器不同,它主要是保证服务调用者在调用异常服务时,快速返回失败结果,避免大量的同步等待,造成服务雪崩,并且能在一段时间后继续侦测请求执行结果, 提供恢复服务调用的可能。而限流器是单纯的限流,不让大量流量把服务搞挂调,服务本身是正常的。

    熔断器的基本原理如下:


    熔断器原理

    熔断器本身就是一个状态机。

    1.关闭状态:熔断器的初始化状态,该状态下允许请求通过。当失败超过阀值,转入打开状态,
    2.打开状态:熔断状态,该状态下不允许请求通过,当进入该状态经过一段时间,进入半开状态。
    3.半开状态:在半开状态期间,允许部分请求通过,在半开期间,观察失败状态是否超过阀值。如果没有超过进入关闭状态,如果超过了进入关闭状态。如此往复

    2、设计

    整个设计使用状态模式(即外部表现行为是 CircuitBreaker,内部状态变化由各个熔断器的 state 类变化),首先着眼点应该在 CircuitBreaker 上,由他理解各个状态的变化过程。

    熔断器对外暴露接口:

    package com.example.demo.circulbreaker;
    
    /**
     * 熔断器接口
     */
    public interface CircuitBreaker {
        /**
         * 重置熔断器
         */
        void reset();
    
        /**
         * 是否允许通过熔断器
         */
        boolean canPassCheck();
    
        /**
         * 统计失败次数
         */
        void countFailNum();
    }
    

    熔断器公共抽象类:

    package com.example.demo.circulbreaker;
    
    /**
     * User: Rudy Tan
     * Date: 2018/9/21
     *
     * 基础熔断器
     */
    public abstract class AbstractCircuitBreaker implements CircuitBreaker {
        /**
         * 熔断器当前状态,声明成 volatile 是防止更改状态的时候,其他线程拿着旧状态做操作
         */
        private volatile CBState state = new CloseCBState();
    
        /**
         * 在熔断器关闭的情况下,在多少秒内失败多少次进入,熔断打开状态(默认10分钟内,失败10次进入打开状态)
         */
        public String thresholdFailRateForClose = "10/600";
    
        /**
         * 在熔断器打开的情况下,熔断多少秒进入半开状态,(默认熔断30分钟)
         */
        public int thresholdIdleTimeForOpen = 1800;
    
        /**
         * 在熔断器半开的情况下, 在多少秒内放多少次请求,去试探(默认10分钟内,放10次请求)
         */
        public String thresholdPassRateForHalfOpen = "10/600";
    
        /**
         * 在熔断器半开的情况下, 试探期间,如果有超过多少次失败的,重新进入熔断打开状态,否者进入熔断关闭状态。
         */
        public int thresholdFailNumForHalfOpen = 1;
    
    
        public CBState getState() {
            return state;
        }
    
        public void setState(CBState state) {
            // 当前状态不能切换为当前状态
            CBState currentState = getState();
            if (currentState.getStateName().equals(state.getStateName())){
                return;
            }
    
            // 多线程环境加锁
            synchronized (this){
                // 二次判断
                currentState = getState();
                if (currentState.getStateName().equals(state.getStateName())){
                    return;
                }
    
                // 更新状态
                this.state = state;
                System.out.println("熔断器状态转移:" + currentState.getStateName() + "->" + state.getStateName());
            }
        }
    }
    
    

    熔断器实现类(各种实现都行,作者是用来简化实现):

    package com.example.demo.circulbreaker;
    
    /**
     * User: Rudy Tan
     * Date: 2018/9/22
     *
     * 本地熔断器(把它当成了工厂了)
     */
    public class LocalCircuitBreaker extends AbstractCircuitBreaker {
        public LocalCircuitBreaker(String failRateForClose,
                                   int idleTimeForOpen,
                                   String passRateForHalfOpen, int failNumForHalfOpen){
            this.thresholdFailRateForClose = failRateForClose;
            this.thresholdIdleTimeForOpen = idleTimeForOpen;
            this.thresholdPassRateForHalfOpen = passRateForHalfOpen;
            this.thresholdFailNumForHalfOpen = failNumForHalfOpen;
        }
    
        public void reset() {
            this.setState(new CloseCBState());
        }
    
        public boolean canPassCheck() {
            return getState().canPassCheck(this);
        }
    
        public void countFailNum() {
            getState().countFailNum(this);
        }
    }
    

    熔断器状态接口:

    package com.example.demo.circulbreaker;
    
    /**
     * 熔断器状态
     */
    public interface CBState {
        /**
         * 获取当前状态名称
         */
        String getStateName();
    
        /**
         * 检查以及校验当前状态是否需要扭转
         */
        void checkAndSwitchState(AbstractCircuitBreaker cb);
    
        /**
         * 是否允许通过熔断器
         */
        boolean canPassCheck(AbstractCircuitBreaker cb);
    
        /**
         * 统计失败次数
         */
        void countFailNum(AbstractCircuitBreaker cb);
    }
    

    各个熔断器状态:

    关闭状态:

    package com.example.demo.circulbreaker;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * User: Rudy Tan
     * Date: 2018/9/21
     *
     * 熔断器-关闭状态
     */
    public class CloseCBState implements CBState {
    
        /**
         * 进入当前状态的初始化时间
         */
        private long stateTime = System.currentTimeMillis();
    
        /**
         * 关闭状态,失败计数器,以及失败计数器初始化时间
         */
        private AtomicInteger failNum = new AtomicInteger(0);
        private long failNumClearTime = System.currentTimeMillis();
    
        public String getStateName() {
            // 获取当前状态名称
            return this.getClass().getSimpleName();
        }
    
        public void checkAndSwitchState(AbstractCircuitBreaker cb) {
            // 阀值判断,如果失败到达阀值,切换状态到打开状态
            long maxFailNum = Long.valueOf(cb.thresholdFailRateForClose.split("/")[0]);
            if (failNum.get() >= maxFailNum){
                cb.setState(new OpenCBState());
            }
        }
    
        public boolean canPassCheck(AbstractCircuitBreaker cb) {
            // 关闭状态,请求都应该允许通过
            return true;
        }
    
        public void countFailNum(AbstractCircuitBreaker cb) {
            // 检查计数器是否过期了,如果过期重新计数
            long period = Long.valueOf(cb.thresholdFailRateForClose.split("/")[1]) * 1000;
            long now = System.currentTimeMillis();
            // 过期
            if (failNumClearTime + period <= now){
                failNum.set(0);
            }
            // 失败计数
            failNum.incrementAndGet();
    
            // 检查是否切换状态
            checkAndSwitchState(cb);
        }
    }
    

    打开状态:

    package com.example.demo.circulbreaker;
    
    /**
     * User: Rudy Tan
     * Date: 2018/9/21
     *
     * 熔断器-打开状态
     */
    public class OpenCBState implements CBState {
        /**
         * 进入当前状态的初始化时间
         */
        private long stateTime = System.currentTimeMillis();
    
        public String getStateName() {
            // 获取当前状态名称
            return this.getClass().getSimpleName();
        }
    
        public void checkAndSwitchState(AbstractCircuitBreaker cb) {
            // 打开状态,检查等待时间是否已到,如果到了就切换到半开状态
            long now = System.currentTimeMillis();
            long idleTime = cb.thresholdIdleTimeForOpen * 1000L;
            if (stateTime + idleTime <= now){
                cb.setState(new HalfOpenCBState());
            }
        }
    
        public boolean canPassCheck(AbstractCircuitBreaker cb) {
            // 检测状态
            checkAndSwitchState(cb);
            return false;
        }
    
        public void countFailNum(AbstractCircuitBreaker cb) {
            // nothing
        }
    }
    

    半开状态:

    package com.example.demo.circulbreaker;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * User: Rudy Tan
     * Date: 2018/9/21
     *
     * 熔断器-半开状态
     */
    public class HalfOpenCBState implements CBState {
    
        /**
         * 进入当前状态的初始化时间
         */
        private long stateTime = System.currentTimeMillis();
    
        /**
         * 半开状态,失败计数器
         */
        private AtomicInteger failNum = new AtomicInteger(0);
    
        /**
         * 半开状态,允许通过的计数器
         */
        private AtomicInteger passNum = new AtomicInteger(0);
    
    
        public String getStateName() {
            // 获取当前状态名称
            return this.getClass().getSimpleName();
        }
    
        public void checkAndSwitchState(AbstractCircuitBreaker cb) {
            // 判断半开时间是否结束
            long idleTime = Long.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[1]) * 1000L;
            long now = System.currentTimeMillis();
            if (stateTime + idleTime <= now){
                // 如果半开状态已结束,失败次数是否超过了阀值
                int maxFailNum = cb.thresholdFailNumForHalfOpen;
                if (failNum.get() >= maxFailNum){
                    // 失败超过阀值,认为服务没有恢复,重新进入熔断打开状态
                    cb.setState(new OpenCBState());
                }else {
                    // 没超过,认为服务恢复,进入熔断关闭状态
                    cb.setState(new CloseCBState());
                }
            }
        }
    
        public boolean canPassCheck(AbstractCircuitBreaker cb) {
            // 检查是否切换状态
            checkAndSwitchState(cb);
    
            // 超过了阀值,不再放量
            int maxPassNum = Integer.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[0]);
            if (passNum.get() > maxPassNum){
                return false;
            }
            // 检测是否超过了阀值
            if (passNum.incrementAndGet() <= maxPassNum){
                return true;
            }
            return false;
        }
    
        public void countFailNum(AbstractCircuitBreaker cb) {
            // 失败计数
            failNum.incrementAndGet();
    
            // 检查是否切换状态
            checkAndSwitchState(cb);
        }
    }
    

    测试类:

    package com.example.demo.circulbreaker;
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * User: Rudy Tan
     * Date: 2018/8/27
     */
    public class App {
    
        public static void main(String[] args) throws InterruptedException {
            final int maxNum = 200;
            final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
    
            final CircuitBreaker circuitBreaker = new LocalCircuitBreaker("5/20", 10, "5/10", 2);
    
            for (int i=0; i < maxNum; i++){
                new Thread(new Runnable() {
                    public void run() {
                        // 模拟随机请求
                        try {
                            Thread.sleep(new Random().nextInt(20) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        try{
                            // 过熔断器
                            if (circuitBreaker.canPassCheck()){
                                // do something
                                System.out.println("正常业务逻辑操作");
    
                                // 模拟后期的服务恢复状态
                                if (countDownLatch.getCount() >= maxNum/2){
                                    // 模拟随机失败
                                    if (new Random().nextInt(2) == 1){
                                        throw new Exception("mock error");
                                    }
                                }
                            } else {
                                System.out.println("拦截业务逻辑操作");
                            }
                        }catch (Exception e){
                            System.out.println("业务执行失败了");
                            // 熔断器计数器
                            circuitBreaker.countFailNum();
                        }
    
                        countDownLatch.countDown();
                    }
                }).start();
    
                // 模拟随机请求
                try {
                    Thread.sleep(new Random().nextInt(5) * 100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            countDownLatch.await();
            System.out.println("end");
        }
    }
    

    3、参考资料

    https://www.jianshu.com/p/fad81d945e2d

    相关文章

      网友评论

          本文标题:熔断器设计

          本文链接:https://www.haomeiwen.com/subject/mpvkaktx.html