美文网首页
go语言grpc学习笔记

go语言grpc学习笔记

作者: 不能吃的坚果j | 来源:发表于2021-03-18 11:04 被阅读0次

    本文作者:陈进坚
    个人博客:https://jian1098.github.io
    CSDN博客:https://blog.csdn.net/c_jian
    简书:https://www.jianshu.com/u/8ba9ac5706b6
    联系方式:jian1098@qq.com

    grpc教程


    视频:https://www.bilibili.com/video/BV1GE411A7kp

    代码:https://github.com/zhuge20100104/grpc-demo

    打开go-module


    set GO111MODULE=on    //windows
    export GO111MODULE=on //linux
    

    编辑器settings-GO-Go Modules勾选

    安装protoc


    Linux

    创建目录

    mkdir proto
    cd proto
    

    打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-linux-x86_64.zip

    wget https://github.com/protocolbuffers/protobuf/releases/download/v3.12.3/protoc-3.12.3-linux-x86_64.zip
    

    解压

    unzip protoc-3.12.3-linux-x86_64.zip
    

    添加到path

    vim /etc/profile
    

    把你的bin路径加到最后保存

    export PATH=$PATH:/root/protoc/bin
    

    刷新配置表

    source /etc/profile
    

    查看版本

    protoc --version
    

    Windows

    打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-win64.zip

    新建一个文件夹并加压,然后把bin目录添加到环境变量即可

    查看版本

    protoc --version
    

    安装protoc-gen-go


    执行命令

    go get -u github.com/golang/protobuf/protoc-gen-go
    

    然后会在$GOPATH/bin目录下发现protoc-gen-go.exe

    安装IDE插件


    此步骤可选

    在goland插件库中安装Protobuf Support

    grpc流程


    创建proto文件

    创建pbfiles/Prod.proto文件,复制下面的代码保存

    syntax="proto3";
    option go_package = ".;services";
    package services;
    
    message ProdRequest{
        int32 prod_id=1;
    }
    message ProResponse{
        int32 pro_stock=1;
    }
    

    生成.pb.go文件

    创建services目录然后在pbfiles目录下执行命令

    protoc --go_out=../services Prod.proto
    

    会得到services/Prod.pb.go文件

    pbfiles/Prod.proto文件新增服务代码

    service ProService{
        rpc GetProStock (ProdRequest) returns (ProResponse);
    }
    

    执行下面的命令

    protoc --go_out=plugins=grpc:../services Prod.proto
    

    services/Prod.pb.go文件会生成更多的代码

    创建业务逻辑文件

    在生成的.pb.go文件中找到GetProdStock的接口,然后复制,创建services/ProdService.go,然后实现GetProdStock方法的具体逻辑

    package services
    
    import (
        "context"
    )
    
    type ProdService struct {
    }
    
    func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest) (*ProdResponse, error) {
        return &ProdResponse{ProStock:20}, nil
    }
    

    创建服务端

    创建server/server.go,并写入以下代码

    package main
    
    import (
        "google.golang.org/grpc"
        "grpc-test/services"
        "log"
        "net"
    )
    
    func main() {
        rpcServer := grpc.NewServer()
        services.RegisterProdServiceServer(rpcServer, new(services.ProdService))
        lis, err := net.Listen("tcp", ":8081")
        if err != nil {
            log.Fatal(err)
        }
    
        //tcp服务
        err = rpcServer.Serve(lis)
        if err != nil {
            log.Fatal(err)
        }
    }
    

    创建客户端

    创建client/client.go,并写入以下代码

    package main
    
    import (
        "context"
        "fmt"
        "grpc-test/services"
        "log"
        "google.golang.org/grpc"
    )
    
    func main() {
        conn, err := grpc.Dial(":8081", grpc.WithInsecure())        //grpc.WithInsecure():不使用证书
        if err != nil {
            log.Fatalf("连接GRPC服务端失败 %v\n", err)
        }
    
        defer conn.Close()
        prodClient := services.NewProdServiceClient(conn)
        prodRes, err := prodClient.GetProdStock(context.Background(),
            &services.ProdRequest{ProdId: 12})
    
        if err != nil {
            log.Fatalf("请求GRPC服务端失败 %v\n", err)
        }
        fmt.Println(prodRes.ProStock)
    }
    

    启动服务

    在命令行执行go run server/server.go,然后在另一个终端执行go run client/client.go即可

    同时提供rpc和http服务

    时提供rpchttp服务的grpc框架

    https://github.com/grpc-ecosystem/grpc-gateway

    第三方字段验证库

    除了自行对参数字段进行验证,也可以选用第三方库验证字段

    github.com/envoyproxy/protoc-gen-validate/validate
    

    流模式


    服务端流

    User.proto

    syntax = "proto3";
    
    package services;
    
    import "Model.proto";
    
    message UserScoreRequest {
        repeated UserInfo users = 1;
    }
    
    message UserScoreResponse {
        repeated UserInfo users = 1;
    }
    
    service UserService {
        rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse) {}
        rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse) {}  //定义rpc服务
    }
    

    服务端UserService.go

    package services
    import context "context"
    
    type UserService struct{}
    
    func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for _, user := range req.Users {
            user.UserScore = score
            score++
            users = append(users, user)
        }
        return &UserScoreResponse{Users: users}, nil
    }
    
    func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
        stream UserService_GetUserScoreByServerStreamServer) error {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for index, user := range req.Users {        //分批发送给客户端
            user.UserScore = score
            score++
            users = append(users, user)
            if (index+1)%2 == 0 && index > 0 {
                err := stream.Send(&UserScoreResponse{Users: users})
                if err != nil {
                    return err
                }
                users = users[0:0]
            }
    
        }
        // 发送最后一批
        if len(users) > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
        }
        return nil
    }
    

    客户端

    package main
    
    import (
        "context"
        "fmt"
        "io"
        "log"
    
        "github.com/zhuge20100104/grpc-demo/grpc-13/client/helper"
    
        "github.com/zhuge20100104/grpc-demo/grpc-13/client/services"
    
        "google.golang.org/grpc"
    )
    
    func main() {
    
        conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
        if err != nil {
            log.Fatalf("连接GRPC服务端失败 %v\n", err)
        }
    
        defer conn.Close()
    
        userClient := services.NewUserServiceClient(conn)
    
        users := make([]*services.UserInfo, 0)
        var i int32 = 0
        for i = 0; i < 6; i++ {
            user := &services.UserInfo{UserId: i + 1}
            users = append(users, user)
        }
    
        stream, err := userClient.GetUserScoreByServerStream(context.Background(),
            &services.UserScoreRequest{Users: users},
        )
    
        if err != nil {
            log.Fatalf("请求GRPC服务端失败 %v\n", err)
        }
    
        for {
            userRes, err := stream.Recv()   //读取流数据
            if err == io.EOF {
                break
            }
            if err != nil {
                fmt.Printf("读取服务端流失败 err: %v\n", err.Error())
            }
            fmt.Println(userRes.Users)
        }
    }
    

    客户端流

    服务端

    package services
    
    import (
        context "context"
        "io"
    )
    
    type UserService struct{}
    
    func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for _, user := range req.Users {
            user.UserScore = score
            score++
            users = append(users, user)
        }
    
        return &UserScoreResponse{Users: users}, nil
    }
    
    func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
        stream UserService_GetUserScoreByServerStreamServer) error {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for index, user := range req.Users {
            user.UserScore = score
            score++
            users = append(users, user)
            if (index+1)%2 == 0 && index > 0 {
                err := stream.Send(&UserScoreResponse{Users: users})
                if err != nil {
                    return err
                }
                users = users[0:0]
            }
    
        }
        // 发送最后一批
        if len(users) > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
        users := make([]*UserInfo, 0)
        var score int32 = 100
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                err = stream.SendAndClose(&UserScoreResponse{Users: users})
                return err
            }
    
            if err != nil {
                return err
            }
    
            for _, user := range req.Users {
                user.UserScore = score
                users = append(users, user)
                score++
            }
        }
    }
    

    客户端

    package main
    
    import (
        "context"
        "fmt"
        "log"
    
        "github.com/zhuge20100104/grpc-demo/grpc-14/client/helper"
    
        "github.com/zhuge20100104/grpc-demo/grpc-14/client/services"
    
        "google.golang.org/grpc"
    )
    
    func main() {
    
        conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
        if err != nil {
            log.Fatalf("连接GRPC服务端失败 %v\n", err)
        }
    
        defer conn.Close()
    
        userClient := services.NewUserServiceClient(conn)
    
        users := make([]*services.UserInfo, 0)
        var i int32 = 0
        for i = 0; i < 6; i++ {
            user := &services.UserInfo{UserId: i + 1}
            users = append(users, user)
        }
    
        stream, err := userClient.GetUserScoreByClientStream(context.Background())
    
        if err != nil {
            log.Fatalf("请求GRPC服务端失败 %v\n", err)
        }
    
        for i := 0; i < 3; i++ {
            req := new(services.UserScoreRequest)
            req.Users = make([]*services.UserInfo, 0)
            var j int32
            for j = 1; j <= 5; j++ {
                req.Users = append(req.Users, &services.UserInfo{UserId: j})
            }
            stream.Send(req)
        }
    
        res, err := stream.CloseAndRecv()
        if err != nil {
            log.Fatalf("接收服务端请求失败 %v\n", err)
        }
    
        for _, user := range res.Users {
            fmt.Println(user)
        }
    
    }
    

    双向流

    服务端UserService.go

    package services
    
    import (
        context "context"
        "io"
    )
    
    type UserService struct{}
    
    func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for _, user := range req.Users {
            user.UserScore = score
            score++
            users = append(users, user)
        }
        return &UserScoreResponse{Users: users}, nil
    }
    
    func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
        stream UserService_GetUserScoreByServerStreamServer) error {
        var score int32 = 100
        users := make([]*UserInfo, 0)
        for index, user := range req.Users {
            user.UserScore = score
            score++
            users = append(users, user)
            if (index+1)%2 == 0 && index > 0 {
                err := stream.Send(&UserScoreResponse{Users: users})
                if err != nil {
                    return err
                }
                users = users[0:0]
            }
        }
        // 发送最后一批
        if len(users) > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
        }
        return nil
    }
    
    func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
        users := make([]*UserInfo, 0)
        var score int32 = 100
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                err = stream.SendAndClose(&UserScoreResponse{Users: users})
                return err
            }
    
            if err != nil {
                return err
            }
    
            for _, user := range req.Users {
                user.UserScore = score
                users = append(users, user)
                score++
            }
        }
    }
    
    func (*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
        users := make([]*UserInfo, 0)
        var score int32 = 100
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
    
            if err != nil {
                return err
            }
    
            for _, user := range req.Users {
                user.UserScore = score
                users = append(users, user)
                score++
            }
    
            stream.Send(&UserScoreResponse{Users: users})
            users = users[0:0]
        }
    }
    

    客户端

    package main
    
    import (
        "context"
        "fmt"
        "io"
        "log"
        "github.com/zhuge20100104/grpc-demo/grpc-15/client/helper"
        "github.com/zhuge20100104/grpc-demo/grpc-15/client/services"
        "google.golang.org/grpc"
    )
    
    func main() {
        conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
        if err != nil {
            log.Fatalf("连接GRPC服务端失败 %v\n", err)
        }
    
        defer conn.Close()
    
        userClient := services.NewUserServiceClient(conn)
    
        users := make([]*services.UserInfo, 0)
        var i int32 = 0
        for i = 0; i < 6; i++ {
            user := &services.UserInfo{UserId: i + 1}
            users = append(users, user)
        }
    
        stream, err := userClient.GetUserScoreByTWS(context.Background())
    
        if err != nil {
            log.Fatalf("请求GRPC服务端失败 %v\n", err)
        }
    
        for i := 0; i < 3; i++ {
            req := new(services.UserScoreRequest)
            req.Users = make([]*services.UserInfo, 0)
            var j int32
            for j = 1; j <= 5; j++ {
                req.Users = append(req.Users, &services.UserInfo{UserId: j})
            }
            stream.Send(req)
    
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }
    
            if err != nil {
                log.Fatalf("接收服务端请求失败 %v\n", err)
            }
            fmt.Println(res.Users)
        }
    }
    

    相关文章

      网友评论

          本文标题:go语言grpc学习笔记

          本文链接:https://www.haomeiwen.com/subject/bcyucltx.html