使用背景
之前写的同步发送通知随着业务量的增长,已经不再适用,所以快速实现一个基本的rq队列+grpc的方式来投递通知数据并交给rq的worker去调用grpc的服务。
但是之前调用的地方太多了,所以最好还是以patch的方式去修改
思路
原有的结构大致为图1所示
图1
首先flask调用grpc再由grpc请求微信服务器发送消息,然后由微信响应请求后返回通知结果给grpc,grpc再返回结果给flask最终返回给客户端,所以除非等到grpc返回调用结果,否则将会一直阻塞
现在则为
图2
flask投递消息到队列中去就就结束了,直接返回到客户端,这里就不会阻塞,而是让监听rabbitMQ的worker去执行
这里暂时只创建一个队列去分发所有类型的通知所以message的格式需要固定
{"method":"method_name", "data":{}}
,客户端调用publish传入对应的参数即可
# client.py
import pika
import pickle
class RabbitClient(object):
def __init__(self, host="localhost", port=5672, routing_key=None):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=host, port=port))
self.channel = self.connection.channel()
self.routing_key = routing_key
def publish(self, method_name, **kwargs):
message = self.package(method_name, **kwargs)
self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=message)
def package(self, method_name, **kwargs):
temp = {"method": method_name}
temp.update(data=kwargs)
return pickle.dumps(temp)
# 这里是调用的工具module,原来的方式已经注释
from apps import rq
# def sen_message_test(user_id, message):
# """
#
# :param user_id:
# :param message: {"title":"","message":""}
# :return:
# """
# with grpc.insecure_channel("{0}:{1}".format(_HOST, _PORT)) as channel:
# client = send_server_pb2_grpc.SendServiceStub(channel=channel)
# response = client.SendMessage(send_server_pb2.SendMessageParam(user_id=user_id, message=json.dumps(message)))
# print("received: " + str(response))
def sen_message_test(user_id, message):
rq.publish("sen_message_test", user_id=user_id, message=message)
def debt_remind_test(user_id=None, bill_id=None):
rq.publish("debt_remind_test", user_id, bill_id)
def repair_remind_test(user_id=None, repair_id=None):
rq.publish("repair_remind_test", user_id=user_id, repair_id=repair_id)
# 太多了就不全列出来了,总之就是要保证原来的业务逻辑代码还能用
# worker.py
import pika
import pickle
class RabbitServer(object):
def __init__(self, host="localhost", port=5672, queue=None):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=host, port=port))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
self.channel.basic_consume(on_message_callback=self.callback, queue=queue, auto_ack=True)
self.dispatcher = RpcMethodDispatcher()
self.setup = self.dispatcher.setup
def callback(self, ch, method, properties, body):
body = pickle.loads(body)
print(body)
func = self.dispatcher.dispatch(body.get("method"))
if not func:
return
try:
func(**body.get("data"))
except Exception as e:
print(e)
def run(self):
print("wait")
self.channel.start_consuming()
class RpcMethodDispatcher(object):
def __init__(self):
self.map = []
def setup(self, name):
# 和message中的method相互对应类似于@app.route("/"),将所有路由添加过来
def deco(f):
self.map.append(MethodMap(name, f))
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
return deco
def dispatch(self, name):
for i in self.map:
if i.name == name:
return i.method
class MethodMap(object):
def __init__(self, name, method):
self.name = name
self.method = method
server = RabbitServer(queue="task_queue")
if __name__ == '__main__':
server.run()
给标题后面加了个(1),我知道这玩意儿很快就会还要修改
可能看到这里就会有同学问了,为啥不new一个thread去执行嘞?
image.png
网友评论