一个整体的项目正在逐步分离成各个服务,系统间的通讯采用消息队列传递,消息队列选择了RabbitMQ
服务器上如何安装rabbit这里就不说了,看官网或者随便找个教程。
这里简单讲述python操作rabbit的过程
一. python操作
1. 安装
官网推荐的库是pika,名字很好,总想接个qiu
pip install -i https://pypi.doubanio.com/simple pika
2. 连接
connection = BlockingConnection(ConnectionParameters(HOST, PORT, credentials=PlainCredentials(USERNAME, PASSWORD)))
3. Publisher
封装成一个class,方便调用
class RabbitMqPublisher(object):
# 必要参数
def __init__(self, host, port, username, passwd):
self.host = host
self.port = port
self.username = username
self.passwd = passwd
self.connection = None
# 初始化实例(建立连接)
def init_instance(self):
self.connection = BlockingConnection(
ConnectionParameters(
self.host,
self.port,
credentials=PlainCredentials(self.username, self.passwd)))
self.chk_instance_init()
# 检查是否成功建立连接
def chk_instance_init(self):
assert self.connection is not None
# 测试连接
def test_connect(self):
return not self.connection.is_closed
# 发布消息
def publish(self, exchange_type, exchange_name, mq_body, routing_key=""):
self.init_instance()
channel = self.connection.channel()
# 有两种发布队列的方式(后续普及相关知识)
# 1 指定队列
# channel.queue_declare(queue=mq_name)
# channel.basic_publish(exchange="", routing_key=mq_name, body=mq_body)
# 2 指定exchange
channel.exchange_declare(
exchange=exchange_name, exchange_type=exchange_type, passive=False, durable=True
) # passive=True: 不会新创建exchange方法
channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=mq_body)
self.close()
# 关闭连接
def close(self):
self.connection.close()
4. 调用
publisher = RabbitMqPublisher(host=, port=, username=, passwd=)
5. Consumer
class RabbitMqConsumer(object):
def __init__(self, host, port, username, passwd):
self.host = host
self.port = port
self.username = username
self.passwd = passwd
self.connection = None
def init_instance(self):
self.connection = BlockingConnection(
ConnectionParameters(
self.host,
self.port,
credentials=PlainCredentials(self.username, self.passwd)))
self.chk_instance_init()
def chk_instance_init(self):
assert self.connection is not None
def test_connect(self):
return not self.connection.is_closed
def consume(self, queue_name="han-log", exchange_name="", need_bind=False):
self.init_instance()
channel = self.connection.channel()
def callback(ch, method, properties, body):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) +
": [x] Received %r" % body, flush=True)
res = json.loads(body)
if res["sender"] == "HAN":
file = open(
"/var/log/mshan/daily-{}.log".format(
time.strftime("%Y-%m-%d", time.localtime(time.time()))
), "a"
)
file.write("【{} verson-{}】INFO: {}".format(res.get("updated"), res.get("ver"), res.get("val")))
file.write("\n")
file.close()
ch.basic_ack(delivery_tag=method.delivery_tag)
# 判断是否需要把exchang绑定queue(因为上述publish针对exchange,consume针对queue)
if need_bind:
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key="")
channel.basic_consume(callback, queue=queue_name, no_ack=False)
# Start event loop
print('Receiving return shipment events...', flush=True)
channel.start_consuming()
def close(self):
self.connection.close()
二. rabbitmq相关知识
-
Publisher
从发布者的角度而言,可以指定两种发送queue的方式,一种是发送到exchange,由exchange决定分发到哪些队列;另一种是直接发送。 -
流程图
rbmq流程.jpg -
routing_key
这个概念没有在图中画出
事实上这个可以理解成图中每一个Q的name
每个exchange应该绑定某些Q(下述),这个绑定就是通过routing_key
每个publisher发布的消息中也应该指定routing_key
不过某些特定规则下publisher的routing_key会被忽略(下述) -
exchange
分发规则,可以定义名称exchange
,类型exchange_type
,(如果没有)是否创建passive
,持久化durable
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, passive=False, durable=True)
其中exchange_type
分三种:
- fanout
默认选择项,即无规则
单纯的分发到exchange明确指定的queue中
routing_key在此规则内不起作用 - direct
一对一规则
publisher除了指定exchange,还要指定routing_key
只有当该exahnge绑定的队列中包含此routing_key的时候,消息才会被queue接受。 -
topic
灵活规则
exchange绑定队列时可以绑定按照某种规则绑定
如图:
rbmq_topic.png
- 服务器端操作
1.查看所有的exchangerabbitmqctl list_exchanges
2.查看所有的队列rabbitmqctl list_queues
网友评论