美文网首页Golang
Rpcx源码之网关Gateway

Rpcx源码之网关Gateway

作者: 神奇的考拉 | 来源:发表于2019-02-20 15:38 被阅读48次

    一、rpcx中的网关

    在使用rpcx过程中可能会存在其他语言比如Java、Python、C#等来调用rpcx服务,这意味着需要提供不同的rpcx的协议来支撑,不过rpcx目前已提供了GateWay来为对应的rpcx服务提供了http网关服务,直接通过http的方式来完成不同语言对rpcx服务的调用。
    目前rpcx提供了两种部署模式:
    1、Gateway:将网关服务单独部署,所有client将http请求发送给gateway,由gateway来完成将http请求转换rpcx请求,再调用对应的rpcx服务,最终再将rpcx返回结果转换成http的response,返回给client


    gateway模式

    2、Agent:将网关程序与client程序部署在一起,agent以一个后端服务的形式存在client机器上,即使机器上已部署多个client,也只需一个agent来完成将client发送的http请求到本地的agent后端服务,再由agent后端服务将http请求转为rpcx请求,再转发给rpcx服务,最终将rpcx服务返回的结果转为http的response返回给client。


    agent模式

    二、rpcx的网关应用

    在server.go文件中有如下的代码来开启gateway: 在执行server.Serve(network,addrewss)时,会尝试启动一个gateway

    func (s *Server) Serve(network, address string) (err error) {
        s.startShutdownListener()
        var ln net.Listener
        ln, err = s.makeListener(network, address)
        if err != nil {
            return
        }
    
        if network == "http" {
            s.serveByHTTP(ln, "")
            return nil
        }
    
        // try to start gateway
        ln = s.startGateway(network, ln)
    
        return s.serveListener(ln)
    }
    

    接下来通过一个简单的demo来了解gateway:
    server代码:相应的代码没有特殊的地方

    package main
    
    import (
        "flag"
        "github.com/smallnest/rpcx/server"
        "rpcx/examples/models"
    )
    
    var (
        addr = flag.String("addr","localhost:8972","server address")
    )
    
    func main() {
        flag.Parse()
    
        // 简单创建一个server 主要验证gateway
        s := server.NewServer()
        s.Register(new(models.Arith),"")
        s.Serve("tcp",*addr) 
        //s.Serve("http", *addr)
    }
    

    client代码:不同于普通的rpcx请求,需要创建一个http的request,在针对http的request进行加工,主要是header的设置

    package main
    
    import (
        "bytes"
        "flag"
        "io/ioutil"
        "net/http"
        "github.com/smallnest/rpcx/codec"
        "github.com/rpcx-ecosystem/rpcx-gateway"
        "rpcx/examples/models"
        "log"
    )
    
    var (
        addr = flag.String("addr","localhost:8972","server address")
    )
    
    func main() {
        cc := &codec.MsgpackCodec{}
    
        args := &models.Args{
            A: 100,
            B: 200,
        }
    
        data, _ := cc.Encode(args)
        // request
        req,err := http.NewRequest("POST","http://127.0.0.1:8972/", bytes.NewReader(data))
        if err != nil{
            log.Fatalf("failed to create request: ", err)
            return
        }
    
        // 设置header
        h := req.Header
        h.Set(gateway.XMessageID,"10000")
        h.Set(gateway.XMessageType,"0")
        h.Set(gateway.XSerializeType,"3")
        h.Set(gateway.XServicePath,"Arith")
        h.Set(gateway.XServiceMethod,"Mul")
    
        // 发送http请求
        //  http请求===>rpcx请求===>rpcx服务===>返回rpcx结果===>转换为http的response===>输出到client
        res, err := http.DefaultClient.Do(req)
        if err != nil{
            log.Fatalf("failed to call: ", err)
        }
        defer res.Body.Close()
        // 获取结果
        replyData, err := ioutil.ReadAll(res.Body)
        if err != nil{
            log.Fatalf("failed to read response: ", err)
        }
            // 解码
        reply := &models.Reply{}
        err = cc.Decode(replyData, reply)
        if err != nil{
            log.Fatalf("failed to decode reply: ", err)
        }
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
    

    关于gateway具体处理部分放到第三部分讲解,上面关于rpcx的gateway使用完成,直接启动server、client即可看到输出结果:

    2019/02/20 14:35:33 response meta data = map[X-RPCX-MessageStatusType:Normal 
                  X-RPCX-Meta: 
                  X-RPCX-SerializeType:3 
                  X-RPCX-MessageID:10000 
                  X-RPCX-ServicePath:Arith 
                  X-RPCX-ServiceMethod:Mul 
                  X-RPCX-Version:0]
    2019/02/20 14:35:33 100 * 200 = 20000
    

    从输出结果的记录日志和上面client中代码可以看到这里面涉及到的http协议转换为rpcx协议涉及到如下的内容:

    • http请求request的header设置

    X-RPCX-Version: rpcx 版本
    X-RPCX-MesssageType: 设置为0,代表request
    X-RPCX-Heartbeat: 是否是heartbeat请求, 默认=false
    X-RPCX-Oneway: 是否是单向请求,默认=false.
    X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
    X-RPCX-MessageID: 消息id, uint64 类型
    X-RPCX-ServicePath: service path
    X-RPCX-ServiceMethod: service method
    X-RPCX-Meta: 额外的metadata数据

    • http响应response的header设置

    X-RPCX-Version: rpcx 版本
    X-RPCX-MesssageType: 1 ,代表response
    X-RPCX-Heartbeat: 是否是heartbeat请求
    X-RPCX-MessageStatusType: Error 还是正常返回结果
    X-RPCX-SerializeType: 0 as raw bytes, 1 as JSON, 2 as protobuf, 3 as msgpack
    X-RPCX-MessageID: 消息id, uint64 类型
    X-RPCX-ServicePath: service path
    X-RPCX-ServiceMethod: service method
    X-RPCX-Meta: 额外的metadata
    X-RPCX-ErrorMessage: 错误信息, 需要设置X-RPCX-MessageStatusType为Error

    三、rpcx的源码剖析

    在server执行serve时,有一个启动gateway的操作

    s.startGateway(network, ln)
    

    而整个rpcx中关于gateway的部分是在server/gateway.go
    startGateway主要完成开启http服务接收请求,同时也会增加一些http的路由:完成将http请求转为rpcx请求,以及返回结果转为http的response在输出给客户端client。

    func (s *Server) startGateway(network string, ln net.Listener) net.Listener {
        if network != "tcp" && network != "tcp4" && network != "tcp6" { // network检查
            log.Infof("network is not tcp/tcp4/tcp6 so can not start gateway")
            return ln
        }
    
        m := cmux.New(ln) // 将当前connection包装成多路复用的模式
    
        // 指定优先级
        httpLn := m.Match(cmux.HTTP1Fast())  // 只匹配http的请求
        rpcxLn := m.Match(cmux.Any()) // 匹配任意请求
    
        go s.startHTTP1APIGateway(httpLn) // http路由规则 真正处理:handleGatewayRequest
        go m.Serve()
    
        return rpcxLn
    }
    

    真正完成gateway的协议转换的操作在handleGatewayRequest方法中,也是在启动过程中绑定处理http请求的handler

    func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
           // 主要是对http请求头header中参数检查:比如XServicePath
           ...省略代码
        servicePath := r.Header.Get(XServicePath) // 获取header中的XServicePath
           ...省略代码
        req, err := HTTPRequest2RpcxRequest(r) // 将http请求转为rpcx请求
        defer protocol.FreeMsg(req) // 为了复用protocol.Message
    
        //设置响应头
        ...省略代码
           // 通过context.Context传递开始请求时间 为了timeout操作的处理
        ctx := context.WithValue(context.Background(), StartRequestContextKey, time.Now().UnixNano())
        err = s.auth(ctx, req) // 对request进行认证(需要以Option方式提供AuthFunc)
        if err != nil { // 认证过程出现error 直接将error返回给client
            s.Plugins.DoPreWriteResponse(ctx, req, nil)
            wh.Set(XMessageStatusType, "Error")
            wh.Set(XErrorMessage, err.Error())
            w.WriteHeader(401)
            s.Plugins.DoPostWriteResponse(ctx, req, req.Clone(), err)
            return
        }
    
        resMetadata := make(map[string]string)
        newCtx := context.WithValue(context.WithValue(ctx, share.ReqMetaDataKey, req.Metadata),
            share.ResMetaDataKey, resMetadata)
    
        res, err := s.handleRequest(newCtx, req) // 处理请求
        defer protocol.FreeMsg(res)
    
        if err != nil { // 处理过程出现error 直接将error返回给client
            log.Warnf("rpcx: failed to handle gateway request: %v", err)
            wh.Set(XMessageStatusType, "Error")
            wh.Set(XErrorMessage, err.Error())
            w.WriteHeader(500)
            return
        }
    
           // 针对rpcx的响应结果进行前置和输出操作
    
        s.Plugins.DoPreWriteResponse(newCtx, req, nil) // 响应前置操作
        if len(resMetadata) > 0 { //copy meta in context to request
            meta := res.Metadata
            if meta == nil {
                res.Metadata = resMetadata
            } else {
                for k, v := range resMetadata {
                    meta[k] = v
                }
            }
        }
    
        meta := url.Values{}
        for k, v := range res.Metadata {
            meta.Add(k, v)
        }
        wh.Set(XMeta, meta.Encode())
        w.Write(res.Payload)
        s.Plugins.DoPostWriteResponse(newCtx, req, res, err) // 输出结果 
    }
    

    将http请求转为rpcx请求之后的请求处理和普通rpcx请求处理无异:handleRequest

    func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
            // 获取请求中的参数
            // 获取本地注册记录中具体处理业务的service,完成相应的处理
            // 构建response对象 填充对应的参数内容
            // 输出protocol.Message即是rpcx的返回结果
        
            // service完成对应的操作:需注意提供service有method和function两类
        mtype := service.method[methodName]
        if mtype == nil { 
            if service.function[methodName] != nil { //service提供function来完成服务
                return s.handleRequestForFunction(ctx, req)
            }
            err = errors.New("rpcx: can't find method " + methodName)
            return handleError(res, err)
        }
            // service提供method完成服务
        var argv = argsReplyPools.Get(mtype.ArgType) // 参数类型
    
        codec := share.Codecs[req.SerializeType()] // 序列化方式
        ...省略代码
        err = codec.Decode(req.Payload, argv) // 解码
        ...省略代码
        replyv := argsReplyPools.Get(mtype.ReplyType) // 返回类型
    
        if mtype.ArgType.Kind() != reflect.Ptr { // 参数类型是否为指针
            err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
        } else { // 输入参数为指针
            err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
        }
            // 响应体
        if !req.IsOneway() {
            data, err := codec.Encode(replyv) // 编码
            argsReplyPools.Put(mtype.ReplyType, replyv)
            if err != nil {
                return handleError(res, err)
    
            }
            res.Payload = data
        }
    
        return res, nil
    }
    

    如上基本上罗列出采用gateway方式请求rpcx服务来支持不同的语言通过http协议的调用过程。

    四、后记

    在源码剖析中并没有将所有代码罗列出来,详细源码剖析见gateway源码

    调用过程

    相关文章

      网友评论

        本文标题:Rpcx源码之网关Gateway

        本文链接:https://www.haomeiwen.com/subject/yrgcyqtx.html