美文网首页微服务架构和实践Go语言Golang
gRPC双向数据流的交互控制(go语言实现)

gRPC双向数据流的交互控制(go语言实现)

作者: 阿狸不歌 | 来源:发表于2018-05-08 14:26 被阅读157次

gRPC简介

gRPC (https://grpc.io) 是一个由Google开发的高性能、开源、跨多种编程语言和通用的远程过程调用协议(RPC) 框架,用于客户端和服务器端之间的通信,使用HTTP/2协议并将 ProtoBuf (https://developers.google.com/protocol-buffers)作为序列化工具。


gRPC模式

gRPC主要有4种请求/响应模式,分别是:

(1) 简单模式(Simple RPC)

这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,所以不再详细介绍。

(2) 服务端数据流模式(Server-side streaming RPC)

这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。

(3) 客户端数据流模式(Client-side streaming RPC)

与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

(4) 双向数据流模式(Bidirectional streaming RPC)

顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。


双向数据流实战

在gRPC中文文档(http://doc.oschina.net/grpc?t=60133)中有上述4种模式的实例,但是其中双向数据流的例子过于简单,没有体现出双向控制的特点,所以本文创建一个新的例子(用golang实现),用以展示gRPC双向数据流的交互(关于proto如何定义、相关包如何安装,在文档中都有介绍,所以本文略去此部分)。

1、proto定义

syntax = "proto3"; // 语法使用 protocol buffer proto3

// 包名: chat
package chat;   

/*
    服务名: Chat,
    其中只有 名为“BidStream”的一个RPC服务,
    输入是 Request格式的数据流, 输出是 Response 格式的数据流
*/
service Chat {  
    rpc BidStream(stream Request) returns (stream Response) {}
}

// 请求数据 Request格式定义
message Request {
    string input = 1;
}

// 响应数据Response格式定义
message Response {
    string output = 1;
}

服务端程序 server.go

package main

import (
    "io"
    "log"
    "net"
    "strconv"
    "google.golang.org/grpc"
    proto "chat" // 自动生成的 proto代码
)

// Streamer 服务端
type Streamer struct{}

// BidStream 实现了 ChatServer 接口中定义的 BidStream 方法
func (s *Streamer) BidStream(stream proto.Chat_BidStreamServer) error {
    ctx := stream.Context()
    for {
        select {
        case <-ctx.Done():
            log.Println("收到客户端通过context发出的终止信号")
            return ctx.Err()
        default:
            // 接收从客户端发来的消息
            输入, err := stream.Recv()
            if err == io.EOF {
                log.Println("客户端发送的数据流结束")
                return nil
            }
            if err != nil {
                log.Println("接收数据出错:", err)
                return err
            }

            // 如果接收正常,则根据接收到的 字符串 执行相应的指令
            switch 输入.Input {
            case "结束对话\n":
                log.Println("收到'结束对话'指令")
                if err := stream.Send(&proto.Response{Output: "收到结束指令"}); err != nil {
                    return err
                }
                // 收到结束指令时,通过 return nil 终止双向数据流
                return nil

            case "返回数据流\n":
                log.Println("收到'返回数据流'指令")
                // 收到 收到'返回数据流'指令, 连续返回 10 条数据
                for i := 0; i < 10; i++ {
                    if err := stream.Send(&proto.Response{Output: "数据流 #" + strconv.Itoa(i)}); err != nil {
                        return err
                    }
                }

            default:
                // 缺省情况下, 返回 '服务端返回: ' + 输入信息
                log.Printf("[收到消息]: %s", 输入.Input)
                if err := stream.Send(&proto.Response{Output: "服务端返回: " + 输入.Input}); err != nil {
                    return err
                }
            }
        }
    }
}

func main() {
    log.Println("启动服务端...")
    server := grpc.NewServer()

    // 注册 ChatServer
    proto.RegisterChatServer(server, &Streamer{})

    address, err := net.Listen("tcp", ":3000")
    if err != nil {
        panic(err)
    }

    if err := server.Serve(address); err != nil {
        panic(err)
    }
}

客户端程序 client.go

package main

import (
    "bufio"
    "context"
    "io"
    "log"
    "os"

    "google.golang.org/grpc"
    proto "chat" // 根据proto文件自动生成的代码
)

func main() {
    // 创建连接
    conn, err := grpc.Dial("localhost:3000", grpc.WithInsecure())
    if err != nil {
        log.Printf("连接失败: [%v]\n", err)
        return
    }
    defer conn.Close()
    
    // 声明客户端
    client := proto.NewChatClient(conn)

    // 声明 context
    ctx := context.Background()

    // 创建双向数据流
    stream, err := client.BidStream(ctx)
    if err != nil {
        log.Printf("创建数据流失败: [%v]\n", err)
    }

    // 启动一个 goroutine 接收命令行输入的指令
    go func() {
        log.Println("请输入消息...")
        输入 := bufio.NewReader(os.Stdin)
        for {
            // 获取 命令行输入的字符串, 以回车 \n 作为结束标志
            命令行输入的字符串, _ := 输入.ReadString('\n')

            // 向服务端发送 指令
            if err := stream.Send(&proto.Request{Input: 命令行输入的字符串}); err != nil {
                return
            }
        }
    }()

    for {
        // 接收从 服务端返回的数据流
        响应, err := stream.Recv()
        if err == io.EOF {
            log.Println("⚠️ 收到服务端的结束信号")
            break   //如果收到结束信号,则退出“接收循环”,结束客户端程序
        }

        if err != nil {
            // TODO: 处理接收错误
            log.Println("接收数据出错:", err)
        }
        
        // 没有错误的情况下,打印来自服务端的消息
        log.Printf("[客户端收到]: %s", 响应.Output)
    }
}

运行效果

先启动服务端程序 server.go
再启动客户端程序 client.go

输入消息,结果类似下图:

运行截图

总结

gRPC是个很强大的RPC框架,而且支持多语言编程,上面的服务端、客户端程序我们完全可以用不同的语言实现,比如服务端用JAVA,客户端用Python...

gRPC的四种交互模式也给我们提供了很大的发挥空间,最近Nginx宣布支持gRPC,这可能也预示着某种趋势...


nginx + grpc

相关文章

网友评论

  • xyt001:大佬 使用这个 流是不是 就不用考虑 tcp的分包 粘包了?
    xyt001:@阿狸不歌 恩恩
    阿狸不歌:@xyt001 gRPC把这些事都做好了,直接用就好了
  • LucasJin:为啥那个输入的go rutine不执行,直接就是for循环了啊,然后直接死循环了
    阿狸不歌:不明白你说的意思?代码无法执行吗?
  • 取个帅气的昵称华华:你源代码有吗,我写了一遍有错误
    阿狸不歌:@取个帅气的昵称华华
    除了proto 自动生成的代码,原始代码都在这里了,请问报的什么错?
  • IT人故事会:写的不错!
    阿狸不歌:@xyt001 通过Websocket与gRPC交互 | gRPC双向数据流的交互控制系列(2)https://www.jianshu.com/p/b325a2848275
    xyt001:没看到websocket在哪呀
    阿狸不歌:谢谢😄

本文标题:gRPC双向数据流的交互控制(go语言实现)

本文链接:https://www.haomeiwen.com/subject/umiprftx.html