美文网首页Golang
Rpcx源码之网关Gateway

Rpcx源码之网关Gateway

作者: 神奇的考拉 | 来源:发表于2019-02-20 15:38 被阅读48次

一、rpcx中的网关

在使用rpcx过程中可能会存在其他语言比如Java、Python、C#等来调用rpcx服务,这意味着需要提供不同的rpcx的协议来支撑,不过rpcx目前已提供了GateWay来为对应的rpcx服务提供了http网关服务,直接通过http的方式来完成不同语言对rpcx服务的调用。
目前rpcx提供了两种部署模式:
1、Gateway:将网关服务单独部署,所有client将http请求发送给gateway,由gateway来完成将http请求转换rpcx请求,再调用对应的rpcx服务,最终再将rpcx返回结果转换成http的response,返回给client


gateway模式

2、Agent:将网关程序与client程序部署在一起,agent以一个后端服务的形式存在client机器上,即使机器上已部署多个client,也只需一个agent来完成将client发送的http请求到本地的agent后端服务,再由agent后端服务将http请求转为rpcx请求,再转发给rpcx服务,最终将rpcx服务返回的结果转为http的response返回给client。


agent模式

二、rpcx的网关应用

在server.go文件中有如下的代码来开启gateway: 在执行server.Serve(network,addrewss)时,会尝试启动一个gateway

func (s *Server) Serve(network, address string) (err error) {
    s.startShutdownListener()
    var ln net.Listener
    ln, err = s.makeListener(network, address)
    if err != nil {
        return
    }

    if network == "http" {
        s.serveByHTTP(ln, "")
        return nil
    }

    // try to start gateway
    ln = s.startGateway(network, ln)

    return s.serveListener(ln)
}

接下来通过一个简单的demo来了解gateway:
server代码:相应的代码没有特殊的地方

package main

import (
    "flag"
    "github.com/smallnest/rpcx/server"
    "rpcx/examples/models"
)

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()

    // 简单创建一个server 主要验证gateway
    s := server.NewServer()
    s.Register(new(models.Arith),"")
    s.Serve("tcp",*addr) 
    //s.Serve("http", *addr)
}

client代码:不同于普通的rpcx请求,需要创建一个http的request,在针对http的request进行加工,主要是header的设置

package main

import (
    "bytes"
    "flag"
    "io/ioutil"
    "net/http"
    "github.com/smallnest/rpcx/codec"
    "github.com/rpcx-ecosystem/rpcx-gateway"
    "rpcx/examples/models"
    "log"
)

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    cc := &codec.MsgpackCodec{}

    args := &models.Args{
        A: 100,
        B: 200,
    }

    data, _ := cc.Encode(args)
    // request
    req,err := http.NewRequest("POST","http://127.0.0.1:8972/", bytes.NewReader(data))
    if err != nil{
        log.Fatalf("failed to create request: ", err)
        return
    }

    // 设置header
    h := req.Header
    h.Set(gateway.XMessageID,"10000")
    h.Set(gateway.XMessageType,"0")
    h.Set(gateway.XSerializeType,"3")
    h.Set(gateway.XServicePath,"Arith")
    h.Set(gateway.XServiceMethod,"Mul")

    // 发送http请求
    //  http请求===>rpcx请求===>rpcx服务===>返回rpcx结果===>转换为http的response===>输出到client
    res, err := http.DefaultClient.Do(req)
    if err != nil{
        log.Fatalf("failed to call: ", err)
    }
    defer res.Body.Close()
    // 获取结果
    replyData, err := ioutil.ReadAll(res.Body)
    if err != nil{
        log.Fatalf("failed to read response: ", err)
    }
        // 解码
    reply := &models.Reply{}
    err = cc.Decode(replyData, reply)
    if err != nil{
        log.Fatalf("failed to decode reply: ", err)
    }
    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

关于gateway具体处理部分放到第三部分讲解,上面关于rpcx的gateway使用完成,直接启动server、client即可看到输出结果:

2019/02/20 14:35:33 response meta data = map[X-RPCX-MessageStatusType:Normal 
              X-RPCX-Meta: 
              X-RPCX-SerializeType:3 
              X-RPCX-MessageID:10000 
              X-RPCX-ServicePath:Arith 
              X-RPCX-ServiceMethod:Mul 
              X-RPCX-Version:0]
2019/02/20 14:35:33 100 * 200 = 20000

从输出结果的记录日志和上面client中代码可以看到这里面涉及到的http协议转换为rpcx协议涉及到如下的内容:

  • http请求request的header设置

X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 设置为0,代表request
X-RPCX-Heartbeat: 是否是heartbeat请求, 默认=false
X-RPCX-Oneway: 是否是单向请求,默认=false.
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata数据

  • http响应response的header设置

X-RPCX-Version: rpcx 版本
X-RPCX-MesssageType: 1 ,代表response
X-RPCX-Heartbeat: 是否是heartbeat请求
X-RPCX-MessageStatusType: Error 还是正常返回结果
X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
X-RPCX-MessageID: 消息id, uint64 类型
X-RPCX-ServicePath: service path
X-RPCX-ServiceMethod: service method
X-RPCX-Meta: 额外的metadata
X-RPCX-ErrorMessage: 错误信息, 需要设置X-RPCX-MessageStatusType为Error

三、rpcx的源码剖析

在server执行serve时,有一个启动gateway的操作

s.startGateway(network, ln)

而整个rpcx中关于gateway的部分是在server/gateway.go
startGateway主要完成开启http服务接收请求,同时也会增加一些http的路由:完成将http请求转为rpcx请求,以及返回结果转为http的response在输出给客户端client。

func (s *Server) startGateway(network string, ln net.Listener) net.Listener {
    if network != "tcp" && network != "tcp4" && network != "tcp6" { // network检查
        log.Infof("network is not tcp/tcp4/tcp6 so can not start gateway")
        return ln
    }

    m := cmux.New(ln) // 将当前connection包装成多路复用的模式

    // 指定优先级
    httpLn := m.Match(cmux.HTTP1Fast())  // 只匹配http的请求
    rpcxLn := m.Match(cmux.Any()) // 匹配任意请求

    go s.startHTTP1APIGateway(httpLn) // http路由规则 真正处理:handleGatewayRequest
    go m.Serve()

    return rpcxLn
}

真正完成gateway的协议转换的操作在handleGatewayRequest方法中,也是在启动过程中绑定处理http请求的handler

func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
       // 主要是对http请求头header中参数检查:比如XServicePath
       ...省略代码
    servicePath := r.Header.Get(XServicePath) // 获取header中的XServicePath
       ...省略代码
    req, err := HTTPRequest2RpcxRequest(r) // 将http请求转为rpcx请求
    defer protocol.FreeMsg(req) // 为了复用protocol.Message

    //设置响应头
    ...省略代码
       // 通过context.Context传递开始请求时间 为了timeout操作的处理
    ctx := context.WithValue(context.Background(), StartRequestContextKey, time.Now().UnixNano())
    err = s.auth(ctx, req) // 对request进行认证(需要以Option方式提供AuthFunc)
    if err != nil { // 认证过程出现error 直接将error返回给client
        s.Plugins.DoPreWriteResponse(ctx, req, nil)
        wh.Set(XMessageStatusType, "Error")
        wh.Set(XErrorMessage, err.Error())
        w.WriteHeader(401)
        s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
        return
    }

    resMetadata := make(map[string]string)
    newCtx := context.WithValue(context.WithValue(ctx, share.ReqMetaDataKey, req.Metadata),
        share.ResMetaDataKey, resMetadata)

    res, err := s.handleRequest(newCtx, req) // 处理请求
    defer protocol.FreeMsg(res)

    if err != nil { // 处理过程出现error 直接将error返回给client
        log.Warnf("rpcx: failed to handle gateway request: %v", err)
        wh.Set(XMessageStatusType, "Error")
        wh.Set(XErrorMessage, err.Error())
        w.WriteHeader(500)
        return
    }

       // 针对rpcx的响应结果进行前置和输出操作

    s.Plugins.DoPreWriteResponse(newCtx, req, nil) // 响应前置操作
    if len(resMetadata) > 0 { //copy meta in context to request
        meta := res.Metadata
        if meta == nil {
            res.Metadata = resMetadata
        } else {
            for k, v := range resMetadata {
                meta[k] = v
            }
        }
    }

    meta := url.Values{}
    for k, v := range res.Metadata {
        meta.Add(k, v)
    }
    wh.Set(XMeta, meta.Encode())
    w.Write(res.Payload)
    s.Plugins.DoPostWriteResponse(newCtx, req, res, err) // 输出结果 
}

将http请求转为rpcx请求之后的请求处理和普通rpcx请求处理无异:handleRequest

func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
        // 获取请求中的参数
        // 获取本地注册记录中具体处理业务的service,完成相应的处理
        // 构建response对象 填充对应的参数内容
        // 输出protocol.Message即是rpcx的返回结果
    
        // service完成对应的操作:需注意提供service有method和function两类
    mtype := service.method[methodName]
    if mtype == nil { 
        if service.function[methodName] != nil { //service提供function来完成服务
            return s.handleRequestForFunction(ctx, req)
        }
        err = errors.New("rpcx: can't find method " + methodName)
        return handleError(res, err)
    }
        // service提供method完成服务
    var argv = argsReplyPools.Get(mtype.ArgType) // 参数类型

    codec := share.Codecs[req.SerializeType()] // 序列化方式
    ...省略代码
    err = codec.Decode(req.Payload, argv) // 解码
    ...省略代码
    replyv := argsReplyPools.Get(mtype.ReplyType) // 返回类型

    if mtype.ArgType.Kind() != reflect.Ptr { // 参数类型是否为指针
        err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
    } else { // 输入参数为指针
        err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
    }
        // 响应体
    if !req.IsOneway() {
        data, err := codec.Encode(replyv) // 编码
        argsReplyPools.Put(mtype.ReplyType, replyv)
        if err != nil {
            return handleError(res, err)

        }
        res.Payload = data
    }

    return res, nil
}

如上基本上罗列出采用gateway方式请求rpcx服务来支持不同的语言通过http协议的调用过程。

四、后记

在源码剖析中并没有将所有代码罗列出来,详细源码剖析见gateway源码

调用过程

相关文章

  • Rpcx源码之网关Gateway

    一、rpcx中的网关 在使用rpcx过程中可能会存在其他语言比如Java、Python、C#等来调用rpcx服务,...

  • Api网关(Spring cloud Gateway)

    1. Spring cloud Gateway网关 什么是网关? 网关就是网络请求的统一入口。gateway是sp...

  • Gateway网关

    Gateway网关: 在介绍之前,先要明白Gateway的作用,为什么用Gateway。 从图中能够看到,网关在微...

  • 高级框架第十二天Gateway:

    Gateway:网关 主要内容 1.API网关 2.Spring Cloud Gateway介绍 3.Gatewa...

  • SpringCloud技术指南系列(十二)API网关之Gatew

    SpringCloud技术指南系列(十二)API网关之Gateway使用 一、概述 API网关是一个更为智能的应用...

  • 基于Zuul实现API Gateway

    本文内容: 简述API Gateway 使用Zuul实现API网关 Zuul的原理 源码及参考资料 简述API G...

  • Rpcx源码之Server

    一、概述 在Rpcx框架源码中,存在Server的角色,用来完成承担Server stub;相对来说Server,...

  • Rpcx源码之Client

    一、概述 在Rpcx框架源码中,存在Client的角色,用来完成承担Client stub;相对来说Server,...

  • Gateway

    Gateway | 简介 Spring Cloud Gateway 是 Spring 官方提供的 API 网关; ...

  • 2020-02-14 网关

    网关的定义 网关(Gateway)又称网间连接器、协议转换器。顾名思义,网关(Gateway)就是一个网络连接到另...

网友评论

    本文标题:Rpcx源码之网关Gateway

    本文链接:https://www.haomeiwen.com/subject/yrgcyqtx.html