golang-熔断器

作者: Best博客 | 来源:发表于2021-03-01 20:47 被阅读0次

熔断器

go-zero在breaker里面基于google的sre算法实现了熔断器逻辑,并在redis等客户端操作的时候引入了熔断器

算法公式
image.png

go-zero 实现了这个公式,并且作为底层组件很好被嵌入使用,在redis的调用就有很好的表现.

  1. 定义接口
  2. 底层组件实现该接口,
  3. 初始化breaker各属性对象,依赖对象数据类型定义的都是interface
    https://github.com/tal-tech/go-zero/blob/master/core/breaker/breaker.go

使用demo

import (
    "errors"
    "fmt"
    "github.com/stretchr/testify/assert"
    "github.com/tal-tech/go-zero/core/stat"
    "strconv"
    "strings"
    "testing"
)
// 熔断服务会将闭包每次返回的结果统计起来
// 最终在sre公式中通过计算得到是否放行的标志
func TestGoogleBreaker(t *testing.T) {
    br := NewBreaker()
    for i := 0; i < 500; i++ {
        br.Do(
            func() error {

                fmt.Println("如果闭包一直执行那么i就会连续的被打印", i)

                if i > 4 {
                    err := errors.New("err info")
                    return err
                } else {
                    return nil
                }

            },
        )
    }
}
image.png

redis调用使用demo

package redis

import (
    "errors"
    "fmt"
    "strconv"
    "time"

    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/breaker"
    "github.com/tal-tech/go-zero/core/mapping"
)
//redis客户端初始化过程中,特意将breaker作为brk依赖注入,使得go-zero底层redis客户端组件自带熔断保护措施
// 参照这我们使用breaker的灵活性就很大了,  闭包。。
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
    var pass string
    for _, v := range redisPass {
        pass = v
    }

    return &Redis{
        Addr: redisAddr,
        Type: redisType,
        Pass: pass,
        brk:  breaker.NewBreaker(),//redis客户端初始化过程中,特意将breaker作为brk依赖注入
    }
}


// 这是找到go-zero的redis客户端del过程
// 看见没 s.brk.DoWithAcceptable, go-zero将所有操作redis的过程都以闭包的形式包入breker中
func (s *Redis) Del(keys ...string) (val int, err error) {
    err = s.brk.DoWithAcceptable(func() error {
        conn, err := getRedis(s)
        if err != nil {
            return err
        }

        if v, err := conn.Del(keys...).Result(); err != nil {
            return err
        } else {
            val = int(v)
            return nil
        }
    }, acceptable)

    return
}

代码分析:


// 闭包是以参数传入该方法的
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
   // 该方法每次在执行闭包之前会先通过 b.accept(); 验证当前是否还能执行闭包 
   if err := b.accept(); err != nil {
        if fallback != nil {
            return fallback(err)
        } else {
            return err
        }
    }

    defer func() {
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    if acceptable(err) {
                // 闭包执行返回的err为nil走这里,进行数据统计
        b.markSuccess()
    } else {
                // 闭包执行返回的err 不为 nil走这里
        b.markFailure()
    }

    return err
}


// sre 公式的真是写照了
// 闭包每次执行之前都会先执行这个方法,看一下还能不能执行闭包
func (b *googleBreaker) accept() error {
    accepts, total := b.history()  //这个方法返回当前时刻 闭包返回nil,与非nil的统计数值,用于sre公式计算
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    if dropRatio <= 0 {
        return nil
    }

    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}
// 闭包执行成功,其实还是调用add
func (b *googleBreaker) markSuccess() {
    b.stat.Add(1)
}


func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    rw.updateOffset()  //核心。。
    rw.win.add(rw.offset, v)
}

// 这里就是计算 sre 算法用到的 参数数字的过程了
// 一开始在初始化breaker的时候,10S为单位分成40份
// 这里用到了一个巧妙方式自动初始化一部分区间的数值,
// 也就是算法一开始所描述的,当错误次数过高熔断被触发,当一段时间过期后便会重新再去调用闭包,
// 只所以会过一段时间去调用,是因为sre公式又成立了,之所以成立了就是因为这里随着时间的推移初始化掉
//了一部分区间的数值
func (rw *RollingWindow) updateOffset() {
        // breaker初始化过程中,|--|--|--|--|--|...   每个区间时间长度为: 10 * time.Second / 40,对应代码为数组的40个下标,不同时刻 闭包执行的统计值的累加结果存在不同下标的对象中进行累加  ,这里的span()就是根据当前时刻决定下标移动的跨度
        span := rw.span()      
        if span > 0 {  
        offset := rw.offset //当前区间的下标
        // reset expired buckets
        start := offset + 1  // 新的开始下标数值
        steps := start + span //新的结束下标数值
        var remainder int
                // 到底了,就从头开始清洗数据
        if steps > rw.size {
            remainder = steps - rw.size
            steps = rw.size
        }
                // 没到底,就将该范围的数值重置为0,这也就是在表达,随着时间的推移,会有一部分数据给归零处理
//使得sre公式又开始成立,也就是上游服务过载让其休息下,过一段时间再来调用看看~
        for i := start; i < steps; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        for i := 0; i < remainder; i++ {
            rw.win.resetBucket(i)
            offset = i
        }
        rw.offset = offset
        rw.lastTime = timex.Now()
    }
}

 

参考文献

https://blog.csdn.net/jfwan/article/details/109328874

相关文章

  • golang-熔断器

    熔断器 go-zero在breaker里面基于google的sre算法实现了熔断器逻辑,并在redis等客户端操作...

  • 熔断器的选择方法(3)

    熔断器的选择 (1)UN熔断器≥UN线路. (2)IN熔断器≥IN线路. (3)熔断器的最大分断能力应大于被保护线...

  • 不怕难之Spring Cloud系列之Hystrix

    一、简介 1. 引言 什么是熔断器? 为什么要有熔断器? 熔断器有哪些考虑指标? 熔断器有哪些适用的设计模式? 熔...

  • 熔断器设计

    1、前言 看别人 RPC 框架代码有熔断器的代码,但是对于熔断器并不是很了解,于是了解一下熔断器设计。熔断器跟限流...

  • go 变量声明初始化、new、make

    title: go 变量声明初始化、new、makedate: 2019-01-04tags:- golang- ...

  • 电器原器件说明

    1、熔断器 熔断器(fuse)是指当电流超过规定值时,以本身产生的热量使熔体熔断,断开电路的一种电器。熔断器是根据...

  • springcloud使用(四) 熔断器Hystrix

    熔断器的概念和优点参考 springcloud(四):熔断器Hystrix, 讲的很详细 基于feign的Hyst...

  • Feign调用报错:failed and no fallback

    timed-out and no fallback 这个错误基本是出现在Hystrix熔断器,熔断器的作用是判断该...

  • SpringCloud 之Hystrix熔断器

    熔断器Hystrix 为什么要使用熔断器 什么是Hystrix Hystrix 中文意思就是豪猪 ,因其背上长满...

  • Hystrix的正确理解方式

    hystrix-logo-tagline-640.png 什么是熔断器 熔断器,原本是电路中在电器发生短路时的防止...

网友评论

    本文标题:golang-熔断器

    本文链接:https://www.haomeiwen.com/subject/bvbmtltx.html