美文网首页GO与微服务
手撸golang GO与微服务 grpc

手撸golang GO与微服务 grpc

作者: 老罗话编程 | 来源:发表于2021-03-26 19:01 被阅读0次

    手撸golang GO与微服务 grpc

    缘起

    最近阅读 [Go微服务实战] (刘金亮, 2021.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    GRPC

    gRPC是跨平台、跨语言并且效率非常高的RPC方式。
    
    gRPC默认使用protobuf。
    可以用proto files创建gRPC服务,
    用protobuf消息类型来定义方法参数和返回类型。
    
    建议在gRPC里使用proto3,
    因为这样可以使用gRPC支持的全部语言,
    并且能避免proto2客户端与proto3服务端交互时出现兼容性问题。
    
    在实战项目中使用gRPC时,
    一定要注意服务器的防火墙必须支持HTTP2.0,
    因为gRPC是基于HTTP2.0设计的。
    

    目标

    • 测试验证gRPC的四种通讯模式:
      • 请求-应答模式
      • 客户端推流模式
      • 客户端拉流模式
      • 双向流模式

    设计

    • hello.proto: 定义gRPC通讯协议
    • HelloServer.go: gRPC服务端的实现
    • pb/hello.pb.go: protoc生成的代码,源码略
    • logger/logger.go: 搜集运行日志以便诊断, 源码略

    单元测试

    grpc_test.go,依次在四种gRPC通讯模式下发送/接收1000条消息,并对所有消息日志进行校验

    package grpc
    
    import (
        "context"
        "fmt"
        "google.golang.org/grpc"
        "io"
        g "learning/gooop/grpc"
        "learning/gooop/grpc/logger"
        "learning/gooop/grpc/pb"
        "strconv"
        "sync"
        "testing"
        "time"
    )
    
    func Test_HelloServer(t *testing.T) {
        fnAssertTrue := func(b bool, msg string) {
            if !b {
                t.Fatal(msg)
            }
        }
        logger.Verbose(false)
    
        serverPort := 3333
        serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
        iTotalMsgCount := 1000
    
        // start server
        srv := new(g.HelloServer)
        err := srv.BeginServeTCP(serverPort)
        if err != nil {
            t.Fatal(err)
        }
        time.Sleep(100 * time.Millisecond)
    
        // connect to grpc server
        conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
        if err != nil {
            t.Fatal(err)
        }
        defer conn.Close()
    
        // create grpc client
        client := pb.NewHelloServerClient(conn)
    
        // test SimpleRequest
        ctx := context.Background()
    
        for i := 0; i < iTotalMsgCount; i++ {
            msg := &pb.HelloMessage{
                Msg: fmt.Sprintf("SimpleRequest %d", i),
            }
            reply, err := client.SimpleRequest(ctx, msg)
            if err != nil {
                t.Fatal(err)
            }
            fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")
        }
        t.Log("passed SimpleRequest")
    
        // test ClientStream
        clientStream, err := client.ClientStream(ctx)
        if err != nil {
            t.Fatal(err)
        }
        for i := 0; i < iTotalMsgCount; i++ {
            msg := &pb.HelloMessage{
                Msg: fmt.Sprintf("ClientStream %08d", i),
            }
            err = clientStream.Send(msg)
            if err != nil {
                t.Fatal(err)
            }
        }
        reply, err := clientStream.CloseAndRecv()
        if err != nil {
            t.Fatal(err)
        }
        fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")
    
        // logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
        for i := 0; i < iTotalMsgCount; i++ {
            log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
            fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
        }
        t.Log("passed ClientStream")
    
        // test ServerStream
        serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
        if err != nil {
            t.Fatal(err)
        }
    
        for {
            msg, err := serverStream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                t.Fatal(err)
            }
            logger.Logf("ServerStream.Recv %s", msg.Msg)
        }
    
        for i := 0; i < iTotalMsgCount; i++ {
            log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
            fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
        }
        t.Log("passed ServerStream")
    
        // test DualStream
        dualStream, err := client.DualStream(ctx)
        var wg sync.WaitGroup
    
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < iTotalMsgCount; i++ {
                msg := &pb.HelloMessage{
                    Msg: fmt.Sprintf("DualStream.Send %08d", i),
                }
                err := dualStream.Send(msg)
                if err != nil {
                    t.Fatal(err)
                }
            }
    
            err = dualStream.CloseSend()
            if err != nil {
                t.Fatal(err)
            }
        }()
    
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                msg, err := dualStream.Recv()
                if err == io.EOF {
                    break
                }
    
                if err != nil {
                    t.Fatal(err)
                }
    
                logger.Logf("DualStream.Recv %s", msg.Msg)
            }
        }()
    
        wg.Wait()
        for i := 0; i < iTotalMsgCount; i++ {
            // Msg: "reply " + msg.Msg,
            // logger.Logf("DualStream.Recv %s", msg.Msg)
            log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
            fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
        }
        t.Log("passed DualStream")
    }
    

    测试输出

    $ go test -v grpc_test.go 
    === RUN   Test_HelloServer
        grpc_test.go:60: passed SimpleRequest
        grpc_test.go:87: passed ClientStream
        grpc_test.go:110: passed ServerStream
        grpc_test.go:159: passed DualStream
    --- PASS: Test_HelloServer (0.79s)
    PASS
    ok      command-line-arguments  0.791s
    

    hello.proto

    定义四种通讯模式的rpc接口

    syntax = "proto3";
    
    package pb;
    
    option go_package="./pb";
    
    service HelloServer {
      rpc SimpleRequest(HelloMessage) returns (HelloMessage);
      rpc ClientStream(stream HelloMessage) returns (HelloMessage);
      rpc ServerStream(HelloMessage) returns (stream HelloMessage);
      rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
    }
    
    message HelloMessage {
      string msg = 1;
    }
    

    HelloServer.go

    gRPC服务端的实现

    package grpc
    
    import (
        "context"
        "fmt"
        "google.golang.org/grpc"
        "io"
        "learning/gooop/grpc/logger"
        "learning/gooop/grpc/pb"
        "net"
        "strconv"
    )
    
    type HelloServer int
    
    func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {
        //logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
        msg.Msg = "reply " + msg.Msg
        return msg, nil
    }
    
    func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                logger.Logf("HelloServer.ClientStream, EOF")
                break
            }
    
            if err != nil {
                logger.Logf("HelloServer.ClientStream, err=%v", err)
                return err
            }
    
            logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
        }
    
        err := stream.SendAndClose(&pb.HelloMessage{
            Msg: "reply ClientStream",
        })
        if err != nil {
            logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
        }
        return nil
    }
    
    func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {
        iMsgCount, err := strconv.Atoi(msg.Msg)
        if err != nil {
            return err
        }
    
        for i := 0; i < iMsgCount; i++ {
            msg := &pb.HelloMessage{
                Msg: fmt.Sprintf("ServerStream-%08d", i),
            }
            err := stream.Send(msg)
            if err != nil {
                return err
            }
        }
    
        return nil
    }
    
    func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                return nil
            }
    
            if err != nil {
                logger.Logf("HelloServer.DualStream, recv err=%v", err)
                return err
            }
            logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)
    
            ret := &pb.HelloMessage{
                Msg: "reply " + msg.Msg,
            }
            err = stream.Send(ret)
            if err != nil {
                logger.Logf("HelloServer.DualStream, send err=%v", err)
                return err
            }
        }
    }
    
    func (me *HelloServer) BeginServeTCP(port int) error {
        listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
        if err != nil {
            return err
        }
    
        server := grpc.NewServer()
        pb.RegisterHelloServerServer(server, me)
        go func() {
            panic(server.Serve(listener))
        }()
    
        return nil
    }
    

    (end)

    相关文章

      网友评论

        本文标题:手撸golang GO与微服务 grpc

        本文链接:https://www.haomeiwen.com/subject/zklshltx.html