美文网首页Go知识库
go语言grpc的stream 使用

go语言grpc的stream 使用

作者: xyt001 | 来源:发表于2018-12-19 23:39 被阅读169次

    前言:

    之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

    stream的种类:

    1. 客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){}
    2. 服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){}
    3. 客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

    其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。

    protobuf的定义:

    syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
    
    //声明 包名
    package pro;
    
    //声明grpc服务
    service Greeter {
       /*
       以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
       */
      rpc GetStream (StreamReqData) returns (stream StreamResData){}
      rpc PutStream (stream StreamReqData) returns (StreamResData){}
      rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
    }
    
    
    //stream请求结构
    message StreamReqData {
       string data = 1;
    }
    //stream返回结构
    message StreamResData {
       string data = 1;
    }
    

    我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。

    服务端的实现:

    package main
    
    import (
        "context"
        "fmt"
        "google.golang.org/grpc"
        "grpc/pro"
        "log"
        "net"
        "sync"
        "time"
    )
    
    const PORT  = ":50051"
    
    type server struct {
    }
    
    //服务端 单向流
    func (s *server)GetStream(req *pro.StreamReqData, res pro.Greeter_GetStreamServer) error{
        i:= 0
        for{
            i++
            res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
            time.Sleep(1*time.Second)
            if i >10 {
                break
            }
        }
        return nil
    }
    
    //客户端 单向流
    func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error {
    
        for {
            if tem, err := cliStr.Recv(); err == nil {
                log.Println(tem)
            } else {
                log.Println("break, err :", err)
                break
            }
        }
    
        return nil
    }
    
    //客户端服务端 双向流
    func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error {
    
        wg := sync.WaitGroup{}
        wg.Add(2)
        go func() {
            for {
                data, _ := allStr.Recv()
                log.Println(data)
            }
            wg.Done()
        }()
    
        go func() {
            for {
                allStr.Send(&pro.StreamResData{Data:"ssss"})
                time.Sleep(time.Second)
            }
            wg.Done()
        }()
    
        wg.Wait()
        return nil
    }
    
    func main(){
        //监听端口
        lis,err := net.Listen("tcp",PORT)
        if err != nil{
            return
        }
        //创建一个grpc 服务器
        s := grpc.NewServer()
        //注册事件
        pro.RegisterGreeterServer(s,&server{})
        //处理链接
        s.Serve(lis)
    }
    

    知识点:

    1. 每个函数都对应着 完成了 protobuf 里面的 定义。
    2. 每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!
    3. 当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!

    客户端调用:

    package main
    
    import (
        "google.golang.org/grpc"
    
        "grpc/pro"
        "log"
        "context"
        "time"
        _ "google.golang.org/grpc/balancer/grpclb"
    )
    
    const (
        ADDRESS = "localhost:50051"
    )
    
    
    func main(){
        //通过grpc 库 建立一个连接
        conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
        if err != nil{
            return
        }
        defer conn.Close()
        //通过刚刚的连接 生成一个client对象。
        c := pro.NewGreeterClient(conn)
        //调用服务端推送流
        reqstreamData := &pro.StreamReqData{Data:"aaa"}
        res,_ := c.GetStream(context.Background(),reqstreamData)
        for {
            aa,err := res.Recv()
            if err != nil {
                log.Println(err)
                break
            }
            log.Println(aa)
        }
        //客户端 推送 流
        putRes, _ := c.PutStream(context.Background())
        i := 1
        for {
            i++
            putRes.Send(&pro.StreamReqData{Data:"ss"})
            time.Sleep(time.Second)
            if i > 10 {
                break
            }
        }
        //服务端 客户端 双向流
        allStr,_ := c.AllStream(context.Background())
        go func() {
            for {
                data,_ := allStr.Recv()
                log.Println(data)
            }
        }()
    
        go func() {
            for {
                allStr.Send(&pro.StreamReqData{Data:"ssss"})
                time.Sleep(time.Second)
            }
        }()
    
        select {
        }
    
    }
    

    client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。

    总结:

    grpc 的 stream 和 go的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。

    相关文章

      网友评论

        本文标题:go语言grpc的stream 使用

        本文链接:https://www.haomeiwen.com/subject/rouuhqtx.html