美文网首页
通过项目学习Go语言之gatekeeper请求流程

通过项目学习Go语言之gatekeeper请求流程

作者: whatiscoding | 来源:发表于2019-12-29 18:45 被阅读0次

    在前面的《通过项目学习Go语言之...》系列文章中,我们对Go语言开发环境的配置以及开发项目最基础的功能之go mod、log做了一些入门的讲解。同时,也对项目gatekeeper中使用的核心组件gin做了一个稍微详细的说明。
    本篇,将通过分析gatekeeper项目结构,理清通过代理到转发到后端realserver以及将代理结果最终响应请求的全流程。
    下面先看一张getekeeper的http网关代理的请求响应全流程图:


    启动请求全景图

    接下来,我们对整个服务的启动和重要的流程节点进行源码的分析。

    启动

    一般服务启动过程中主要是进行初始化、回调钩子和监听注册等基础性工作。gatekeeper也不例外,再启动过程中,进行了配置加载、数据库访问初始化、http和tcp检测注册、系统信号监听等一系列工作。

    //main.go
    func main() {
        conf = flag.String("config", "./conf/dev/", "input config file like ./conf/dev/")
        flag.Parse()
    //初始化mysql redis配置
        lib.InitModule(*conf,[]string{"base","mysql","redis",})
        defer lib.Destroy()
        public.InitMysql()
        public.InitConf()
    
    //初始化配置、配置管理设置实时监控配置变化并刷新
        service.SysConfMgr = service.NewSysConfigManage()
        service.SysConfMgr.InitConfig()
        service.SysConfMgr.MonitorConfig()
    
    //注册请求前验证request方法,进行请求处理前的验证,只允许授权方访问
    //目前支持的是固定的配置模式,可以根据需求进行自定义
        service.RegisterBeforeRequestAuthFunc(service.AuthAppToken)
    
        //注册请求后更改response方法
        service.RegisterModifyResponseFunc(service.FilterCityData([]string{"/gatekeeper/tester_filter/goods_list"}))
    //启动http、tcp监听
        router.HTTPServerRun()
        router.TCPServerRun()
    //注册系统信号监听
        quit := make(chan os.Signal)
        signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
        <-quit
    //注册服务优雅关闭
        router.TCPServerStop()
        router.HTTPServerStop()
        signal.Stop(quit)
    }
    

    Http服务监听是处理来自外部的全部请求,下面看一下HTTPServerRun,getekeeper的http服务是基于gin来实现的。

    //httpserver.go
    //HTTPServerRun 服务启动
    func HTTPServerRun() {
    //设置运行模式 debug
        gin.SetMode(lib.ConfBase.DebugMode)
    //初始化路由
        r := InitRouter()
    //设置http server运行参数
        HTTPSrvHandler = &http.Server{
            Addr:           lib.GetStringConf("base.http.addr"),
            Handler:        r,
            ReadTimeout:    time.Duration(lib.GetIntConf("base.http.read_timeout")) * time.Second,
            WriteTimeout:   time.Duration(lib.GetIntConf("base.http.write_timeout")) * time.Second,
            MaxHeaderBytes: 1 << uint(lib.GetIntConf("base.http.max_header_bytes")),
        }
    //设置recover 以实现遇到panic时服务不中断
    //启动http服务
        go func() {
            defer func() {
                if err := recover(); err != nil {
                    public.SysLogger.Error("HttpServerRun_recover:%v", err)
                }
            }()
            log.Printf(" [INFO] HttpServer %s listening\n",lib.GetStringConf("base.http.addr"))
            if err := HTTPSrvHandler.ListenAndServe(); err != nil && err != http.ErrServerClosed {
                log.Fatalf(" [ERROR] HttpServer %s err:%v\n", lib.GetStringConf("base.http.addr"), err)
            }
        }()
    }
    

    路由注册

    在上面启动http服务的代码中,调用了InitRouter()对相关的路由进行的设置,分别注册了四组路由:admin(管理后台)、gateway(服务探活ping)、cluster(集群管理,reload处理配置变更刷新)、gatekeeper(网关代理服务);另外还启动了一个静态服务器/assets,用于管理后台资源的服务器。下面我们看一下详细的代码:

    //httprouter.go
    //InitRouter 声明http路由
    func InitRouter() *gin.Engine {
    //创建路由r(engine),注册中间件
        router := gin.New()
    //加载gin内置中间件,recovery所有panic是服务长久运行不中断
        router.Use(middleware.Recovery())
    
        //admin
        admin := router.Group("/admin")
        admin.Use(middleware.RequestTraceLog())
        {
            controller.AdminRegister(admin)
        }
    
        //assets
        router.Static("/assets", "./tmpl/green/assets")
    
        //gateway
    //提供探活
        gateway := controller.Gateway{}
        router.GET("/ping", gateway.Ping)
    
        //cluster
    //提供服务配置刷新
        csr:=router.Group("/")
        csr.Use(middleware.ClusterAuth())
        csr.GET("/reload", gateway.Reload)
    //注册gatekeeper路由组,代理所有网关请求
        gw:=router.Group(lib.GetStringConf("base.http.route_prefix"))
    //注册一系列中间件
        gw.Use(
            middleware.RequestTraceLog(),
            middleware.MatchRule(),
            middleware.AccessControl(),
            middleware.HTTPLimit(),
            //todo 拓展中间件
    //核心中间件部分
            middleware.LoadBalance())
        {
            gw.GET("/*action", gateway.Index)
            gw.POST("/*action", gateway.Index)
            gw.DELETE("/*action", gateway.Index)
            gw.OPTIONS("/*action", gateway.Index)
        }
        return router
    }
    

    中间件注册

    在上面的代码中,可以看到默认注册了RequestTraceLog(),MatchRule(),AccessControl(),HTTPLimit(),LoadBalance()五个getekeeper项目自定义的中间件,分配用于请求treace日志记录、规则匹配、访问控制、限流和负载均衡。

    • RequestTraceLog
      通过trace可以快速定位请求链,在进行微服务构建时,使用teaceid机制能够很方便的绘制请求的整个链路,以方便问题的排查和各段服务的耗时记录,优化服务。
    //trace_log.go
    //RequestTraceLog trace中间件
    func RequestTraceLog() gin.HandlerFunc {
        return func(c *gin.Context) {
            RequestInLog(c)
            defer RequestOutLog(c)
            c.Next()
        }
    }
    
    //RequestInLog 请求进入日志
    func RequestInLog(c *gin.Context) {
    //初始化trance 默认分配一个traceid
        traceContext := lib.NewTrace()
    //如果能在header中获取到traceid,则使用已有的id
        if traceID := c.Request.Header.Get("didi-header-rid"); traceID != "" {
            traceContext.TraceId = traceID
        }
        if spanID := c.Request.Header.Get("didi-header-spanid"); spanID != "" {
            traceContext.SpanId = spanID
        }
    ...
    //将trace相关的追踪信息保存进上下文request
    c.Set("startExecTime", time.Now())
        c.Set("trace", traceContext)
        c.Request.Header.Set("didi-header-rid", traceContext.TraceId)
    
        c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("trace"), traceContext))
        c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("request_url"), c.Request.URL.Path))
        c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
    }
    
    • MatchRule
      用于规则的匹配,基于分解URI实现和管理配置后台相对应的转发逻辑的匹配,
    //match_rule.go
    //MatchRule 匹配模块中间件
    func MatchRule() gin.HandlerFunc {
        return func(c *gin.Context) {
    //初始化geteway service
            gws := service.NewGateWayService(c.Writer, c.Request)
    //进行规则匹配 选出匹配的module
            if err := gws.MatchRule(); err != nil {
                public.ResponseError(c, http.StatusBadRequest, err)
                return
            }
            c.Set(MiddlewareServiceKey,gws)
        }
    }
    
    • AccessControl
      权限控制中间件,用于对外部过来的请求就行访问控制过滤,目前默认支持的包括IP黑白名单、主机白名单以及请求注册函数过滤
    //gate_service.go
    //AccessControl 权限验证
    func (o *GateWayService) AccessControl() error {
        if o.currentModule.AccessControl == nil {
            return nil
        }
        ctx := public.NewContext(o.w, o.req)
        var errmsg string
        switch {
        case !AuthModuleOpened(o, ctx):
            public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
                "msg": "access_control_not_open",
            })
            return nil
        case AuthInBlackIPList(o, ctx):
            public.ContextNotice(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
                "msg": "AuthInBlackIPList",
            })
            return errors.New("msg:AuthInBlackIPList")
        case AuthInWhiteIPList(o, ctx):
            public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
                "msg": "AuthWhiteIPList_success",
            })
            return nil
        case AuthInWhiteHostList(o, ctx):
            public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
                "msg": "AuthWhiteHostList_success",
            })
            return nil
        case AuthRegisterFunc(o, &errmsg):
            public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
                "msg": "AuthRegisterFunc_success",
            })
            return nil
        }
        if errmsg==""{
            errmsg="auth_failure"
        }
        public.ContextWarning(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
            "msg": errmsg,
        })
        return errors.New(errmsg)
    }
    
    • HTTPLimit
      限流控制中间件,以保护后端的realserver服务器以免过载导致服务异常不可用。
    //HTTPLimit http限流中间件
    func HTTPLimit() gin.HandlerFunc {
        return func(c *gin.Context) {
            //获取上游服务
            gws, ok := c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
            if !ok {
                public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
                return
            }
    
            //入口流量统计
            currentModule := gws.CurrentModule()
            counter := public.FlowCounterHandler.GetRequestCounter(currentModule.Base.Name)
            counter.Increase(c.Request.Context(), c.Request.RemoteAddr)
    
            //客户端ip限流
            remoteIP := public.Substr(c.Request.RemoteAddr, 0, int64(strings.Index(c.Request.RemoteAddr, ":")))
            if currentModule.AccessControl.ClientFlowLimit > 0 {
                limiter := public.FlowLimiterHandler.GetModuleIPVisitor(currentModule.Base.Name+"_"+remoteIP, currentModule.AccessControl.ClientFlowLimit)
                if limiter.Allow() == false {
                    errmsg := fmt.Sprintf("moduleName:%s remoteIP:%s, QPS limit : %d, %d", currentModule.Base.Name, remoteIP, int64(limiter.Limit()), limiter.Burst())
                    public.ContextWarning(c.Request.Context(), service.DLTagAccessControlFailure, map[string]interface{}{
                        "msg":        errmsg,
                        "ip":         remoteIP,
                        "moduleName": currentModule.Base.Name,
                    })
                    public.ResponseError(c, http.StatusBadRequest, errors.New(errmsg))
                }
            }
    
            //todo
            c.Next()
        }
    }
    
    • LoadBalance
      该中间件是最核心的部门,它负责整个负责均衡的处理,负责根据匹配规则创建正确的proxy,进行正常服务的处理。目前提供的负载均衡策略是rr,同时也实现了后端realserver的探活自动剔除策略。
    //load_balance.go
    //LoadBalance 负载均衡中间件
    func LoadBalance() gin.HandlerFunc {
        return func(c *gin.Context) {
            gws,ok:=c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
            if !ok{
                public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
                return
            }
    //进入核心负载算法,选出正常的服务
            proxy, err := gws.LoadBalance()
            if err != nil {
                public.ResponseError(c, http.StatusProxyAuthRequired, err)
                return
            }
            requestBody,ok:=c.MustGet(MiddlewareRequestBodyKey).([]byte)
            if !ok{
                public.ResponseError(c, http.StatusBadRequest, errors.New("request_body not valid"))
                return
            }
            c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(requestBody))
            proxy.ServeHTTP(c.Writer, c.Request)
            c.Abort()
        }
    }
    
    //LoadBalance 请求负载
    func (o *GateWayService) LoadBalance() (*httputil.ReverseProxy, error) {
        ipList, err := SysConfMgr.GetModuleIPList(o.currentModule.Base.Name)
        if err != nil {
            public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
                "msg":             err,
                "modulename":      o.currentModule.Base.Name,
                "availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
            })
            return nil, errors.New("get_iplist_error")
        }
        if len(ipList) == 0 {
            public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
                "msg":             "empty_iplist_error",
                "modulename":      o.currentModule.Base.Name,
                "availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
            })
            return nil, errors.New("empty_iplist_error")
        }
    //正常proxy的遴选
        proxy, err := o.GetModuleHTTPProxy()
        if err != nil {
            public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
                "msg":       err,
                "module":    o.currentModule.Base.Name,
            })
            return nil, err
        }
        return proxy, nil
    }
    
    //GetModuleHTTPProxy 获取模块的代理
    func (o *GateWayService) GetModuleHTTPProxy() (*httputil.ReverseProxy, error) {
        proxy,err:=SysConfMgr.GetModuleHTTPProxy(o.currentModule.Base.Name)
        if err != nil {
            public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
                "err":       err,
                "module":    o.currentModule.Base.Name,
            })
            return &httputil.ReverseProxy{}, err
        }
        return proxy,nil
    }
    
    //GetModuleHTTPProxy 获取http代理方法
    func (s *SysConfigManage) GetModuleHTTPProxy(moduleName string) (*httputil.ReverseProxy, error) {
    //基于rr的module选择
        rr, err := s.GetModuleRR(moduleName)
        if err != nil {
            return nil, err
        }
        s.moduleProxyFuncMapLocker.RLock()
        defer s.moduleProxyFuncMapLocker.RUnlock()
        proxyFunc, ok := s.moduleProxyFuncMap[moduleName]
        if ok {
            return proxyFunc(rr), nil
        }
        return nil, errors.New("module proxy empty")
    }
    

    以上,我们对getekeeper的启动以及核心的组件加载和请求流程进行了分析,从源码上看,getekeeper的代码思路还是很清晰明了的。

    本节完。

    相关文章

      网友评论

          本文标题:通过项目学习Go语言之gatekeeper请求流程

          本文链接:https://www.haomeiwen.com/subject/cdfjnctx.html