美文网首页
Python gRPC笔记

Python gRPC笔记

作者: Daisy丶 | 来源:发表于2020-11-21 23:28 被阅读0次

    简介:

    gRPC 是Google发布的一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。gRPC提供了支持多种编程语言的、对网络设备进行配置和纳管的方法。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

    文档:gRPC 官方文档中文版

    gRPC

    实现:

    环境

    • Python 3.8 or Anaconda3
    • grpcio
    • protobuf

    通过pip安装依赖

    pip install grpcio
    pip install protobuf
    pip install grpcio-tools
    

    实现proto文件

    gRPC通过 protocol buffers来定义 gRPC service 和方法 request 以及 response 的类型。

    要定义请求的和响应的数据类型,需要在 .proto 文件中指定关键字 message。要定义一个服务,需要 .proto 文件中指定关键字 service

    gRPC 允许你定义四类服务方法,通过stream关键字来控制:

    • 一元模式RPC,即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。
    • 服务端流模式 RPC,即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
    • 客户端流模式 RPC,即客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
    • 双向流模式 RPC,即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。

    如下一个简单地grpc proto文件demo.proto,定义了数据传输的格式和四种不同的服务类型:

    // protocal buffers 语法版本声明
    syntax="proto3";
    
    // message 定义了传输数据的格式,等号之后的数字是字段编号,每个编号都是唯一的。message结构类似于结构体
    message Request {
        int32 client_id = 1;
        string request_data = 2;
    }
    
    message Response {
        int32 server_id = 1;
        string response_data = 2;
    }
    
    // 定义gRPC服务方法
    service gRPCDemo {
        // 一元模式(在一次调用中, 客户端只能向服务器传输一次请求数据, 服务器也只能返回一次响应)
        rpc SimpleMethod(Request) returns (Response);
        // 客户端流模式(在一次调用中, 客户端可以多次向服务器传输数据, 但是服务器只能返回一次响应)
        rpc ClientStreamMethod(stream Request) returns (Response);
        // 服务端流模式(在一次调用中, 客户端只能一次向服务器传输数据, 但是服务器可以多次返回响应)
        rpc ServerStreamMethod(Request) returns (stream Response);
        // 双向流模式 (在一次调用中, 客户端和服务器都可以向对方多次收发数据)
        rpc BidirectStremMethod(stream Request) returns (stream Response);
    }
    

    通过下列命令可以根据demo.proto自动生成客户端和服务端的代码,我们需要做的只是继承里面实现具体处理逻辑的方法并实现他们,而不需要关系具体的传输逻辑。

    python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./demo.proto
    

    执行后会生成demo_pb2.pydemo_pb2_grpc.py两个文件,里面分别实现了消息类、服务的抽象类和客户端应用使用的函数。

    实现服务端与客户端

    服务端:

    继承gRPCDemoServicer的抽象类,实现proto里定义的函数。一元模式接收和返回的是request类和response类,流模式接收和返回的是类的迭代器。

    import grpc
    import demo_pb2
    import demo_pb2_grpc
    from concurrent import futures
    from threading import Thread
    
    
    SERVER_ID = 1
    SERVER_ADDRESS = "localhost:50051"
    
    
    class DemoService(demo_pb2_grpc.gRPCDemoServicer):
        def SimpleMethod(self, request, context):
            print('call simple method from client {}'.format(request.client_id))
            print('data details: {}'.format(request.request_data))
    
            reponse = demo_pb2.Response(server_id=SERVER_ID, response_data='simple data')
    
            return reponse
    
        def ClientStreamMethod(self,  request_iterator, context):
            for i, request in enumerate(request_iterator):
                print('data {} details: {}'.format(i, request.request_data))
    
            reponse = demo_pb2.Response(server_id=SERVER_ID, response_data='client stream data')
    
            return reponse
    
        def ServerStreamMethod(self, request, context):
            for i in range(5):
                response = demo_pb2.Response(server_id=SERVER_ID, response_data='server stream data {}'.format(i))
                yield response
    
        def BidirectStremMethod(self, request_iterator, context):
            # 开启一个子线程去接收数据
            def parse_request():
                for request in request_iterator:
                    print("recv from client(%d), message= %s" %
                          (request.client_id, request.request_data))
    
            t = Thread(target=parse_request)
            t.start()
    
            for i in range(5):
                yield demo_pb2.Response(
                    server_id=SERVER_ID,
                    response_data=("send by Python server, message= %d" % i))
    
            t.join()
    
    
    def main():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
        demo_pb2_grpc.add_gRPCDemoServicer_to_server(DemoService(), server)
    
        server.add_insecure_port(SERVER_ADDRESS)
        print("------------------start Python GRPC server")
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        main()
    
    

    客户端:

    与服务端通过指定端口建立通道,通过gRPCDemoStub类实例化一个客户端向服务端发送请求。客户端可以像调用本地函数一样来调用服务端的函数。

    import grpc
    import time
    import demo_pb2
    import demo_pb2_grpc
    
    client_ID = 1
    SERVER_ADDRESS = "localhost:50051"
    
    
    def simple_method(stub):
        request = demo_pb2.Request(client_id=client_ID, request_data='simple call')
        response = stub.SimpleMethod(request)
        print(response.response_data)
    
    
    def client_stream_method(stub):
        # 创建一个生成器
        def request_messages():
            for i in range(5):
                request = demo_pb2.Request(
                    client_id=client_ID,
                    request_data=("called by Python client, message:%d" % i))
                yield request
    
        response = stub.ClientStreamingMethod(request_messages())
    
    
    def server_stream_method(stub):
        request = demo_pb2.Request(client_id=client_ID, request_data='server stream call')
        response = stub.ServerStreamMethod(request)
    
        for res in response:
            print(res.response_data)
    
    
    def bidirectional_streaming_method(stub):
        print("--------------Call BidirectionalStreamingMethod Begin---------------")
    
        # 创建一个生成器
        def request_messages():
            for i in range(5):
                request = demo_pb2.Request(
                    client_id=client_ID,
                    request_data=("called by Python client, message: %d" % i))
                yield request
                time.sleep(1)
    
        response_iterator = stub.BidirectionalStreamingMethod(request_messages())
        for response in response_iterator:
            print("recv from server(%d), message=%s" % (response.server_id, response.response_data))
    
        print("--------------Call BidirectionalStreamingMethod Over---------------")
    
    
    def main():
        MAX_MESSAGE_LENGTH = 1024 * 1024 * 10
    
        options = [('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
                   ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]
    
        with grpc.insecure_channel(SERVER_ADDRESS, options=options) as channel:
            stub = demo_pb2_grpc.gRPCDemoStub(channel)
            simple_method(stub)
            # server_stream_method(stub)
    
    
    if __name__ == '__main__':
        main()
    
    

    通信测试

    一元模式:

    服务端挂起在50051端口等待请求,客户端发起一次请求服务端对应的函数就被调用一次。


    server

    服务端流模式:

    服务端挂起在50051端口等待请求,客户端发起多次请求,服务端函数仅调用一次,将客户端的多次请求当做一个迭代器来处理。

    client server

    相关文章

      网友评论

          本文标题:Python gRPC笔记

          本文链接:https://www.haomeiwen.com/subject/lnmjnhtx.html