美文网首页
使用kazoo改写grpc程序

使用kazoo改写grpc程序

作者: bwisgood | 来源:发表于2019-08-06 00:36 被阅读0次

    之前我们用过grpc来实现过远程过程调用,这次我们将Zookeeper加入到我们的实践当中,所以需要改写其中的一部分程序
    server:
    在实现服务端的时候我们需要在开始服务之前将自己的服务信息注册到Zookeeper去,值得注意的是当我们需要测试断开连接是否会从Zookeeper中取消注册的时候我们最好在关闭的时候使用KazooClient.stop()方法,要不然连接不会瞬间关闭而是有一定的延迟,得出的测试效果也会大大折扣

    import req_pb2_grpc as PService
    import req_pb2 as Mess
    import grpc
    from concurrent import futures
    
    
    # 实现被调用的具体代码
    class DemoService(PService.DemoServicer):
    
        def __init__(self):
            self.city_db = {
                "beijing": [' python ', 'c++', 'go'],
                "shanghai": [' 产品 ', '123', '数学'],
                "wuhan": [' 语文 ', '英语', '数学']
            }
            self.answers = list(range(10))
    
        def Calculate(self, request, context):
            if request.num1 == 0:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                context.set_details("cannot divide by 0")
                return Mess.Result()
    
            if request.op == Mess.Work.ADD:
                result = request.num1 + request.num2
            elif request.op == Mess.Work.SUBTRACT:
                result = request.num1 - request.num2
            else:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                context.set_details("invalid op parameter")
                return Mess.Result()
    
            return Mess.Result(result=result)
    
            pass
    
        def GetCommunity(self, request, context):
            city = request.name
            subs = self.city_db.get(city)
            print("*" * 100)
            for sub in subs:
                import time
                # time.sleep(1)
                yield Mess.Subject(name=sub)
    
        def Accumulate(self, request_iterator, context):
            sum = 0
            for num in request_iterator:
                sum += num.val
                print(sum)
            return Mess.Sum(val=sum)
    
        def GuessNumber(self, request_iterator, context):
            for request in request_iterator:
                if request.val in self.answers:
                    print(request.val)
                    yield Mess.Answer(val=request.val, desc='bingo')
    
    
    from kazoo.client import KazooClient
    import json
    import sys
    
    
    class DistributeServer(object):
        def register_zookeeper(self, data):
            """
            注册到zookeeper
            :return:
            """
            # 创建kazoo客户端
            self.zk = KazooClient("127.0.0.1:2181")
            # 建立连接
            self.zk.start()
            # 在zookeeper中创建节点信息
            self.zk.ensure_path("/rpc")
            self.zk.create('/rpc/server', data, ephemeral=True, sequence=True)
            print("成功注册到zookeeper:", data.decode())
    
        # 开启服务器
        def serve(self, host, port):
            # 创建服务器-对象 多线程服务器
            server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
            # 注册实现的服务方法到服务器对象中
            PService.add_DemoServicer_to_server(DemoService(), server)
            # 设置服务地址
            server.add_insecure_port('{}:{}'.format(host, port))
            # 开启服务
            print('start server')
            # 注册到zookeeper
            addr = {"host": host, "port": port}
            self.register_zookeeper(json.dumps(addr).encode())
    
            server.start()
            # 关闭服务
            import time
    
            try:
                time.sleep(1000)
            except KeyboardInterrupt:
                self.zk.stop()
                server.stop(0)
                print("stop server")
            pass
    
    
    if __name__ == '__main__':
        port = sys.argv[1]
        host = sys.argv[2]
        server = DistributeServer()
        server.serve(port, host)
    

    client:
    在客户端需要进行rpc调用的时候首先从Zookeeper中获取当前的正常运行的服务有哪些,然后从中选取一个服务器去进行连接

    import grpc
    from grpc_demo import req_pb2_grpc, req_pb2
    
    
    def invoke(stub):
        work = req_pb2.Work()
        work.num1 = 1
        work.num2 = 2
        work.op = req_pb2.Work.ADD
        result = stub.Calculate(work)
        print("result :", result.result)
    
        work.num1 = 0
        try:
            result = stub.Calculate(work)
        except grpc.RpcError as e:
            print('{}:{}'.format(e.code(), e.details()))
    
    
    def invoke_get_sub(stub):
        city = req_pb2.City(name="beijing")
        subs = stub.GetCommunity(city)
        for sub in subs:
            print(sub.name)
    
    
    def generate_delta():
        for i in range(10):
            import random
            import time
            time.sleep(0.5)
            delta = random.randint(1, 100)
            yield req_pb2.Delta(val=delta)
    
    
    def invoke_sum(stub):
        delta_iterator = generate_delta()
        sum = stub.Accumulate(delta_iterator)
        print(sum.val)
    
    
    def generate_number():
        for i in range(10):
            import random
            import time
            time.sleep(0.5)
            delta = random.randint(1, 100)
            yield req_pb2.Number(val=delta)
    
    
    def invoke_guess(stub):
        number_iterator = generate_number()
        answers = stub.GuessNumber(number_iterator)
        for an in answers:
            print("{}:{}".format(an.val, an.desc))
    
    
    from kazoo.client import KazooClient
    from kazoo.exceptions import KazooException
    import json
    
    
    class Client(object):
        def __init__(self):
            self.zk = KazooClient("127.0.0.1:2181")
            self.zk.start()
            self.servers = []
            self.get_servers()
    
        def get_servers(self, event=None):
            # 有变化的话回调g重新获取server列表
            servers = self.zk.get_children("/rpc", watch=self.get_servers)
            print("="*20)
            print("更新了!!!!!")
            print("="*20)
    
            if not servers:
                raise KazooException("暂无子节点")
            for server in servers:
                # server_list.append("/rpc/" + json(server_data))
                addr_data, node_state = self.zk.get("/rpc/" + server)
                self.servers.append(addr_data.decode())
    
        def get_server(self):
            import random
            server = json.loads(random.choice(self.servers))
            return "{}:{}".format(server["host"], server["port"])
    
        def run(self):
            # 获取分布式的channel 从zookeeper中获取
            ipport = self.get_server()
            print("REQUEST: ", ipport)
            # channel = None
            while True:
                try:
                    channel = grpc.insecure_channel(ipport)
                except Exception:
                    continue
                else:
                    break
            # with grpc.insecure_channel(ipport) as channel:
            # 创建stub对象
            stub = req_pb2_grpc.DemoStub(channel)
            # invoke_get_sub(stub)
            try:
                invoke_get_sub(stub)
                # invoke(stub)
            except Exception as e:
                pass
            # invoke_guess(stub)
    
            channel.close()
    
    
    if __name__ == '__main__':
        import time
    
        ins = Client()
        for i in range(50):
            time.sleep(1)
            ins.run()
    

    测试:
    我们这里可以开启多个server分别监听不同的端口,并且让客户端在执行的时候打印出请求的服务是哪一个
    然后在客户端运行的过程中,关闭某一个server,这时候我们就可以看到客户端的终端会打印出”更新了“的字样,并且后续的请求都不会再去请求那个已经被我们关闭的服务,代表测试成功。

    相关文章

      网友评论

          本文标题:使用kazoo改写grpc程序

          本文链接:https://www.haomeiwen.com/subject/sgdbdctx.html