美文网首页
grpc 初体验

grpc 初体验

作者: Newzer | 来源:发表于2023-03-05 17:41 被阅读0次

    之前很多次看到 grpc ,但一直都机会没有投产使用过,最近机会终于来了,我们项目需要用grpc了,我是对接者,也就是grpc 的客户端,下面就来说说我的流程吧
    首先我去看了挺多关于protobuf 和grpc 和rpc 相关的知识,就是扫盲吧。

    1.编写proto文件,其实这个文件是服务端那边定的,就像http接口一样,基本都是服务端来定义接口参数和格式,因此服务端把他的proto文件给我了,我点开一看,里面有很多东西,其中有很多的部分都是我不需要的,因为他还要给别的服务提供grpc接口,他全部写在一个文件里了(我认为最好的做法是分开,分别给多个客户端他自己的proto文件),虽然他说不用管其他的部分,但是我还是只想要我会用的,因此我删掉了很多
    文件基本内容:
    定义服务:服务的概念就是一组接口,相当于一个结构体或者类吧,后面要调用接口时会用到这个,在服务里定义几个接口,一般情况下,接口入参是一个接口体,接口出参也是一个结构体
    定义服务的时候有一个很重要的标识:stream, 如果 stream 在入参前面则表示客户端流式rpc,stream 在出参前面表示服务端流式rpc,如果两个都有,则表示双向流式rpc,如果都没有表示简单rpc,这个东西对长连接很有用,后面能看到
    我们是定义了一个服务端流式rpc
    service Linker {
    rpc 接口1(SubState) returns (stream ReportData) {}
    }
    服务必须用service 来定义

    2.定义message,也就是入参出参的结构体,包含类型和id
    message SubState {
    string consumer_group = 2;
    string client_id= 3;
    }
    message ReportData {
    string cabinet_no = 1; //依次是字段类型,字段名,id
    string topic = 2;
    string content = 3;
    string id = 4;
    }
    结构体必须用message 来定义
    为什么要定义id的值为数字呢,这是因为 rpc 数据传输并不是使用字段名的,是使用我们定义的这个id值,这样可以提高传输效率,字段名是给我们人看的,id的值直接按顺序写下来就好

    3.生成关键代码,因为 protobuf 是不区分编程语言的数据结构,因此需要用专门的工具来生成特定语言的代码,这个就是 protoc,protoc 安装就不赘述了,直接上命令
    protoc --go_out=../ --go-grpc_out=../ *.proto
    第一个选项是生成消息序列化代码,grpc 的客户端和服务端都会使用到这里的代码,不要改
    第二个选项是生成 grpc 的代码,包含客户端和服务端的代码

    4.调用接口

    package main
    
    import (
        service "grpc-c/service"
    )
    
    func main() {
        service.ConsumerData()
    }
    
    
    func ConsumerData() error {
        defer func() {//防止首次连接时 panic
            if r := recover(); r != nil {
                fmt.Println("grpc 连接错误!!!")
                time.Sleep(3 * time.Second)
                fmt.Println("休息一下,马上连接...")
                ConsumerData()
            }
        }()
        conn, err := grpc.Dial("localhost:6666",
            grpc.WithTransportCredentials(insecure.NewCredentials()))
    
        if err != nil {
            log.Println(err.Error())
            return err
        }
        defer conn.Close()
        //服务端流模式
        c := pb.NewLinkerClient(conn)
        ctx, _ := context.WithCancel(context.Background())
        stream, err := c.SubCabMessage(ctx, &pb.SubState{ConsumerGroup: "111", ClientId: "222"})
        if err != nil {
            log.Println("订阅grpc err", err.Error())
        }
    
        for {
            a, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
            if err != nil {
                if strings.Contains(err.Error(), "Unavailable") { //防止运行过程中,服务端断线引起的 panic,则客户端重连
                    ConsumerData()
                }
                continue
            }
            log.Printf("handleMessage:%+v,处理完毕", a)
        }
    }
    

    第一步:建立连接
    第二步:创建客户端
    第三步:调用接口
    第四步:循环获取数据
    如果不是服务端流式 rpc 就调用一次就能获取结果,我们这个是服务端流式rpc ,所以得循环获取,实现了一次连接,服务端多次推送数据的效果

    关键的关键,实现了客户端断线重连
    仔细看一下返回的数据,都是之前proto 文件里定义好的结构体
    [图片上传中...(image.png-bbf33-1678095393319-0)]

    仔细看一下 *_grpc.pb.go grpc 文件代码,就能发现很多东西,比如客户端怎么从接口获取数据

    grpc 客户端就这么完成了

    grpc 服务端端

    package main
    
    import (
        "fmt"
        "net"
    
        pb "rpc/pb"
        "rpc/service"
    
        "google.golang.org/grpc"
    )
    
    func main() {
        server := grpc.NewServer()
        pb.RegisterLinkerServer(server, service.LinkerServer{})
    
        lis, err := net.Listen("tcp", ":6666")
        if err != nil {
            fmt.Println("err:", err.Error())
        }
        go service.LocalCall() //模拟服务端发送数据时刻,比如说由http接口触发grpc服务端发数据给grpc客户端
        err = server.Serve(lis)
        if err != nil {
            fmt.Println("err:", err.Error())
        }
    }
    
    
    package service
    
    import (
        "fmt"
        "log"
        pb "rpc/pb"
        "time"
    )
    
    type LinkerServer struct {
        pb.UnimplementedLinkerServer
    }
    
    var reschan = make(chan *pb.ReportData)
    
    func (s LinkerServer) SubCabMessage(sub_state *pb.SubState, stream pb.Linker_SubCabMessageServer) error {
        for data := range reschan {
            err := stream.Send(data)
            if err != nil {
                log.Println("发送grpc err", err.Error())
                return err
            }
        }
        return nil
    }
    
    func LocalCall() {
        n := 0
        for {
            fmt.Println("n:", n)
            server := pb.ReportData{
                CabinetNo: fmt.Sprintf("chenjuan%d", n),
                Topic:     "test",
                Content:   "{111}",
                Id:        "iddddd",
            }
            reschan <- &server
            time.Sleep(5 * time.Second)
            n++
        }
    
    }
    

    这个流程是由另一个http接口触发给grpc客户端发数据,并不是grpc客户端发起,由于grpc接口无法由本地调用,因此我暂时只能将要发送的数据通过channel 发送给SubCabMessage这个接口

    相关文章

      网友评论

          本文标题:grpc 初体验

          本文链接:https://www.haomeiwen.com/subject/nixpldtx.html