美文网首页RabbitMQRabbitMQ
RabbitMQ+GRPC的快速使用(1)

RabbitMQ+GRPC的快速使用(1)

作者: bwisgood | 来源:发表于2019-08-02 02:15 被阅读0次

    使用背景

    之前写的同步发送通知随着业务量的增长,已经不再适用,所以快速实现一个基本的rq队列+grpc的方式来投递通知数据并交给rq的worker去调用grpc的服务。
    但是之前调用的地方太多了,所以最好还是以patch的方式去修改

    思路

    原有的结构大致为图1所示


    图1

    首先flask调用grpc再由grpc请求微信服务器发送消息,然后由微信响应请求后返回通知结果给grpc,grpc再返回结果给flask最终返回给客户端,所以除非等到grpc返回调用结果,否则将会一直阻塞
    现在则为


    图2
    flask投递消息到队列中去就就结束了,直接返回到客户端,这里就不会阻塞,而是让监听rabbitMQ的worker去执行

    这里暂时只创建一个队列去分发所有类型的通知所以message的格式需要固定
    {"method":"method_name", "data":{}},客户端调用publish传入对应的参数即可

    # client.py
    import pika
    import pickle
    
    
    class RabbitClient(object):
        def __init__(self, host="localhost", port=5672, routing_key=None):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=host, port=port))
            self.channel = self.connection.channel()
            self.routing_key = routing_key
    
        def publish(self, method_name, **kwargs):
            message = self.package(method_name, **kwargs)
    
            self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=message)
    
        def package(self, method_name, **kwargs):
            temp = {"method": method_name}
            temp.update(data=kwargs)
            return pickle.dumps(temp)
    
    # 这里是调用的工具module,原来的方式已经注释
    from apps import rq
    
    
    # def sen_message_test(user_id, message):
    #     """
    #
    #     :param user_id:
    #     :param message: {"title":"","message":""}
    #     :return:
    #     """
    #     with grpc.insecure_channel("{0}:{1}".format(_HOST, _PORT)) as channel:
    #         client = send_server_pb2_grpc.SendServiceStub(channel=channel)
    #         response = client.SendMessage(send_server_pb2.SendMessageParam(user_id=user_id, message=json.dumps(message)))
    #     print("received: " + str(response))
    
    def sen_message_test(user_id, message):
        rq.publish("sen_message_test", user_id=user_id, message=message)
    
    def debt_remind_test(user_id=None, bill_id=None):
        rq.publish("debt_remind_test", user_id, bill_id)
    
    def repair_remind_test(user_id=None, repair_id=None):
        rq.publish("repair_remind_test", user_id=user_id, repair_id=repair_id)
    
    # 太多了就不全列出来了,总之就是要保证原来的业务逻辑代码还能用
    
    # worker.py
    import pika
    import pickle
    
    
    class RabbitServer(object):
        def __init__(self, host="localhost", port=5672, queue=None):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=host, port=port))
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=queue, durable=True)
    
            self.channel.basic_consume(on_message_callback=self.callback, queue=queue, auto_ack=True)
            self.dispatcher = RpcMethodDispatcher()
            self.setup = self.dispatcher.setup
    
        def callback(self, ch, method, properties, body):
            body = pickle.loads(body)
            print(body)
    
            func = self.dispatcher.dispatch(body.get("method"))
            if not func:
                return
            try:
                func(**body.get("data"))
            except Exception as e:
                print(e)
    
        def run(self):
            print("wait")
            self.channel.start_consuming()
    
    
    class RpcMethodDispatcher(object):
        def __init__(self):
            self.map = []
    
        def setup(self, name):
            # 和message中的method相互对应类似于@app.route("/"),将所有路由添加过来
            def deco(f):
                self.map.append(MethodMap(name, f))
    
                def wrapper(*args, **kwargs):
                    return f(*args, **kwargs)
    
                return wrapper
    
            return deco
    
        def dispatch(self, name):
            for i in self.map:
                if i.name == name:
                    return i.method
    
    
    class MethodMap(object):
        def __init__(self, name, method):
            self.name = name
            self.method = method
    
    
    server = RabbitServer(queue="task_queue")
    
    if __name__ == '__main__':
        server.run()
    
    

    给标题后面加了个(1),我知道这玩意儿很快就会还要修改
    可能看到这里就会有同学问了,为啥不new一个thread去执行嘞?


    image.png

    相关文章

      网友评论

        本文标题:RabbitMQ+GRPC的快速使用(1)

        本文链接:https://www.haomeiwen.com/subject/mfzhdctx.html