在前面的《通过项目学习Go语言之...》系列文章中,我们对Go语言开发环境的配置以及开发项目最基础的功能之go mod、log做了一些入门的讲解。同时,也对项目gatekeeper中使用的核心组件gin做了一个稍微详细的说明。
本篇,将通过分析gatekeeper项目结构,理清通过代理到转发到后端realserver以及将代理结果最终响应请求的全流程。
下面先看一张getekeeper的http网关代理的请求响应全流程图:
启动请求全景图
接下来,我们对整个服务的启动和重要的流程节点进行源码的分析。
启动
一般服务启动过程中主要是进行初始化、回调钩子和监听注册等基础性工作。gatekeeper也不例外,再启动过程中,进行了配置加载、数据库访问初始化、http和tcp检测注册、系统信号监听等一系列工作。
//main.go
func main() {
conf = flag.String("config", "./conf/dev/", "input config file like ./conf/dev/")
flag.Parse()
//初始化mysql redis配置
lib.InitModule(*conf,[]string{"base","mysql","redis",})
defer lib.Destroy()
public.InitMysql()
public.InitConf()
//初始化配置、配置管理设置实时监控配置变化并刷新
service.SysConfMgr = service.NewSysConfigManage()
service.SysConfMgr.InitConfig()
service.SysConfMgr.MonitorConfig()
//注册请求前验证request方法,进行请求处理前的验证,只允许授权方访问
//目前支持的是固定的配置模式,可以根据需求进行自定义
service.RegisterBeforeRequestAuthFunc(service.AuthAppToken)
//注册请求后更改response方法
service.RegisterModifyResponseFunc(service.FilterCityData([]string{"/gatekeeper/tester_filter/goods_list"}))
//启动http、tcp监听
router.HTTPServerRun()
router.TCPServerRun()
//注册系统信号监听
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
//注册服务优雅关闭
router.TCPServerStop()
router.HTTPServerStop()
signal.Stop(quit)
}
Http服务监听是处理来自外部的全部请求,下面看一下HTTPServerRun,getekeeper的http服务是基于gin来实现的。
//httpserver.go
//HTTPServerRun 服务启动
func HTTPServerRun() {
//设置运行模式 debug
gin.SetMode(lib.ConfBase.DebugMode)
//初始化路由
r := InitRouter()
//设置http server运行参数
HTTPSrvHandler = &http.Server{
Addr: lib.GetStringConf("base.http.addr"),
Handler: r,
ReadTimeout: time.Duration(lib.GetIntConf("base.http.read_timeout")) * time.Second,
WriteTimeout: time.Duration(lib.GetIntConf("base.http.write_timeout")) * time.Second,
MaxHeaderBytes: 1 << uint(lib.GetIntConf("base.http.max_header_bytes")),
}
//设置recover 以实现遇到panic时服务不中断
//启动http服务
go func() {
defer func() {
if err := recover(); err != nil {
public.SysLogger.Error("HttpServerRun_recover:%v", err)
}
}()
log.Printf(" [INFO] HttpServer %s listening\n",lib.GetStringConf("base.http.addr"))
if err := HTTPSrvHandler.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf(" [ERROR] HttpServer %s err:%v\n", lib.GetStringConf("base.http.addr"), err)
}
}()
}
路由注册
在上面启动http服务的代码中,调用了InitRouter()对相关的路由进行的设置,分别注册了四组路由:admin(管理后台)、gateway(服务探活ping)、cluster(集群管理,reload处理配置变更刷新)、gatekeeper(网关代理服务);另外还启动了一个静态服务器/assets,用于管理后台资源的服务器。下面我们看一下详细的代码:
//httprouter.go
//InitRouter 声明http路由
func InitRouter() *gin.Engine {
//创建路由r(engine),注册中间件
router := gin.New()
//加载gin内置中间件,recovery所有panic是服务长久运行不中断
router.Use(middleware.Recovery())
//admin
admin := router.Group("/admin")
admin.Use(middleware.RequestTraceLog())
{
controller.AdminRegister(admin)
}
//assets
router.Static("/assets", "./tmpl/green/assets")
//gateway
//提供探活
gateway := controller.Gateway{}
router.GET("/ping", gateway.Ping)
//cluster
//提供服务配置刷新
csr:=router.Group("/")
csr.Use(middleware.ClusterAuth())
csr.GET("/reload", gateway.Reload)
//注册gatekeeper路由组,代理所有网关请求
gw:=router.Group(lib.GetStringConf("base.http.route_prefix"))
//注册一系列中间件
gw.Use(
middleware.RequestTraceLog(),
middleware.MatchRule(),
middleware.AccessControl(),
middleware.HTTPLimit(),
//todo 拓展中间件
//核心中间件部分
middleware.LoadBalance())
{
gw.GET("/*action", gateway.Index)
gw.POST("/*action", gateway.Index)
gw.DELETE("/*action", gateway.Index)
gw.OPTIONS("/*action", gateway.Index)
}
return router
}
中间件注册
在上面的代码中,可以看到默认注册了RequestTraceLog(),MatchRule(),AccessControl(),HTTPLimit(),LoadBalance()五个getekeeper项目自定义的中间件,分配用于请求treace日志记录、规则匹配、访问控制、限流和负载均衡。
- RequestTraceLog
通过trace可以快速定位请求链,在进行微服务构建时,使用teaceid机制能够很方便的绘制请求的整个链路,以方便问题的排查和各段服务的耗时记录,优化服务。
//trace_log.go
//RequestTraceLog trace中间件
func RequestTraceLog() gin.HandlerFunc {
return func(c *gin.Context) {
RequestInLog(c)
defer RequestOutLog(c)
c.Next()
}
}
//RequestInLog 请求进入日志
func RequestInLog(c *gin.Context) {
//初始化trance 默认分配一个traceid
traceContext := lib.NewTrace()
//如果能在header中获取到traceid,则使用已有的id
if traceID := c.Request.Header.Get("didi-header-rid"); traceID != "" {
traceContext.TraceId = traceID
}
if spanID := c.Request.Header.Get("didi-header-spanid"); spanID != "" {
traceContext.SpanId = spanID
}
...
//将trace相关的追踪信息保存进上下文request
c.Set("startExecTime", time.Now())
c.Set("trace", traceContext)
c.Request.Header.Set("didi-header-rid", traceContext.TraceId)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("trace"), traceContext))
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), public.ContextKey("request_url"), c.Request.URL.Path))
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
}
- MatchRule
用于规则的匹配,基于分解URI实现和管理配置后台相对应的转发逻辑的匹配,
//match_rule.go
//MatchRule 匹配模块中间件
func MatchRule() gin.HandlerFunc {
return func(c *gin.Context) {
//初始化geteway service
gws := service.NewGateWayService(c.Writer, c.Request)
//进行规则匹配 选出匹配的module
if err := gws.MatchRule(); err != nil {
public.ResponseError(c, http.StatusBadRequest, err)
return
}
c.Set(MiddlewareServiceKey,gws)
}
}
- AccessControl
权限控制中间件,用于对外部过来的请求就行访问控制过滤,目前默认支持的包括IP黑白名单、主机白名单以及请求注册函数过滤
//gate_service.go
//AccessControl 权限验证
func (o *GateWayService) AccessControl() error {
if o.currentModule.AccessControl == nil {
return nil
}
ctx := public.NewContext(o.w, o.req)
var errmsg string
switch {
case !AuthModuleOpened(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "access_control_not_open",
})
return nil
case AuthInBlackIPList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
"msg": "AuthInBlackIPList",
})
return errors.New("msg:AuthInBlackIPList")
case AuthInWhiteIPList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthWhiteIPList_success",
})
return nil
case AuthInWhiteHostList(o, ctx):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthWhiteHostList_success",
})
return nil
case AuthRegisterFunc(o, &errmsg):
public.ContextNotice(o.req.Context(), DLTagAccessControlSuccess, map[string]interface{}{
"msg": "AuthRegisterFunc_success",
})
return nil
}
if errmsg==""{
errmsg="auth_failure"
}
public.ContextWarning(o.req.Context(), DLTagAccessControlFailure, map[string]interface{}{
"msg": errmsg,
})
return errors.New(errmsg)
}
- HTTPLimit
限流控制中间件,以保护后端的realserver服务器以免过载导致服务异常不可用。
//HTTPLimit http限流中间件
func HTTPLimit() gin.HandlerFunc {
return func(c *gin.Context) {
//获取上游服务
gws, ok := c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
if !ok {
public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
return
}
//入口流量统计
currentModule := gws.CurrentModule()
counter := public.FlowCounterHandler.GetRequestCounter(currentModule.Base.Name)
counter.Increase(c.Request.Context(), c.Request.RemoteAddr)
//客户端ip限流
remoteIP := public.Substr(c.Request.RemoteAddr, 0, int64(strings.Index(c.Request.RemoteAddr, ":")))
if currentModule.AccessControl.ClientFlowLimit > 0 {
limiter := public.FlowLimiterHandler.GetModuleIPVisitor(currentModule.Base.Name+"_"+remoteIP, currentModule.AccessControl.ClientFlowLimit)
if limiter.Allow() == false {
errmsg := fmt.Sprintf("moduleName:%s remoteIP:%s, QPS limit : %d, %d", currentModule.Base.Name, remoteIP, int64(limiter.Limit()), limiter.Burst())
public.ContextWarning(c.Request.Context(), service.DLTagAccessControlFailure, map[string]interface{}{
"msg": errmsg,
"ip": remoteIP,
"moduleName": currentModule.Base.Name,
})
public.ResponseError(c, http.StatusBadRequest, errors.New(errmsg))
}
}
//todo
c.Next()
}
}
- LoadBalance
该中间件是最核心的部门,它负责整个负责均衡的处理,负责根据匹配规则创建正确的proxy,进行正常服务的处理。目前提供的负载均衡策略是rr,同时也实现了后端realserver的探活自动剔除策略。
//load_balance.go
//LoadBalance 负载均衡中间件
func LoadBalance() gin.HandlerFunc {
return func(c *gin.Context) {
gws,ok:=c.MustGet(MiddlewareServiceKey).(*service.GateWayService)
if !ok{
public.ResponseError(c, http.StatusBadRequest, errors.New("gateway_service not valid"))
return
}
//进入核心负载算法,选出正常的服务
proxy, err := gws.LoadBalance()
if err != nil {
public.ResponseError(c, http.StatusProxyAuthRequired, err)
return
}
requestBody,ok:=c.MustGet(MiddlewareRequestBodyKey).([]byte)
if !ok{
public.ResponseError(c, http.StatusBadRequest, errors.New("request_body not valid"))
return
}
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(requestBody))
proxy.ServeHTTP(c.Writer, c.Request)
c.Abort()
}
}
//LoadBalance 请求负载
func (o *GateWayService) LoadBalance() (*httputil.ReverseProxy, error) {
ipList, err := SysConfMgr.GetModuleIPList(o.currentModule.Base.Name)
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": err,
"modulename": o.currentModule.Base.Name,
"availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
})
return nil, errors.New("get_iplist_error")
}
if len(ipList) == 0 {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": "empty_iplist_error",
"modulename": o.currentModule.Base.Name,
"availableIpList": SysConfMgr.GetAvaliableIPList(o.currentModule.Base.Name),
})
return nil, errors.New("empty_iplist_error")
}
//正常proxy的遴选
proxy, err := o.GetModuleHTTPProxy()
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"msg": err,
"module": o.currentModule.Base.Name,
})
return nil, err
}
return proxy, nil
}
//GetModuleHTTPProxy 获取模块的代理
func (o *GateWayService) GetModuleHTTPProxy() (*httputil.ReverseProxy, error) {
proxy,err:=SysConfMgr.GetModuleHTTPProxy(o.currentModule.Base.Name)
if err != nil {
public.ContextWarning(o.req.Context(), DLTagLoadBalanceFailure, map[string]interface{}{
"err": err,
"module": o.currentModule.Base.Name,
})
return &httputil.ReverseProxy{}, err
}
return proxy,nil
}
//GetModuleHTTPProxy 获取http代理方法
func (s *SysConfigManage) GetModuleHTTPProxy(moduleName string) (*httputil.ReverseProxy, error) {
//基于rr的module选择
rr, err := s.GetModuleRR(moduleName)
if err != nil {
return nil, err
}
s.moduleProxyFuncMapLocker.RLock()
defer s.moduleProxyFuncMapLocker.RUnlock()
proxyFunc, ok := s.moduleProxyFuncMap[moduleName]
if ok {
return proxyFunc(rr), nil
}
return nil, errors.New("module proxy empty")
}
以上,我们对getekeeper的启动以及核心的组件加载和请求流程进行了分析,从源码上看,getekeeper的代码思路还是很清晰明了的。
本节完。
网友评论