上节介绍了将每个消息投递给一个consumer,本节介绍Publish/Subscribe(推送、订阅),将一个消息投递到多个consumer,实质是通过广播形式将消息传递给所有consumer
RabbitMQ消息模块的核心理念是producer不直接将消息交付到队列,并且producer通常不知道将消息交付到哪个队列中,producer仅仅将消息发送到 exchange。exchange其实就是一侧接受producer交付的消息,另一侧将消息推送到队列中。exchange必须知道接收的消息内容,然后通过 exchange type 来定义是将消息推送到指定队列、推送到多个队列还是直接丢弃消息。
exchanges1. Exchange
exchange 的有效类型有:direct,topic,headers和fanout
这里主要关注"fanout",创建并命名为"logs":
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout exchange,从字面也是就能知道,它广播所有的消息到它所知道的队列。
可以通过 rabbitmqctl 命令列出当前的exchange:
rabbitmqctl list_exchanges
空字符("")代表默认exchange,消息将会交付到 routing_key 指定的队列中,前提是此队列必须存在。
2. Temporary queues
-
1.连接到Rabbit的一个临时队列(使用随机名称命名队列),在声明队列时不需要指定
queue
参数:result = channel.queue_declare()
-
2.一旦consumer连接关闭,则删除队列。设置
exclusive
标志:result = channel.queue_declare(exclusive=True)
3. Bindings
bindings上面已经创建了"fanout exchange"和queue,现在需要告诉exchange将消息发送到queue。(关联exchange和queue的动作叫: binding)
channel.queue_bind(exchange='logs', queue=result.method.queue) # result.method.queue 是一个随机的队列名,例如:amq.gen-JzTY20BRgKO-HjmUJj0wLg
列出"bindings":
rabbitmqctl list_bindings
构建一个日志系统,由两个程序组成,第一个发送日志消息,第二个接受并且打印
python-three-overall- emit_log.py 提交日志
#!/usr/bin/env python3
# coding=utf-8
import pika
message = ' '.join(sys.argv[1:]) or 'Hello World!'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 声明"logs" exchange,类型为fanout(广播)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs',
routing_key='', # fanout模式,需要提供此参数,但是忽略此值
body=message,
)
print("[x] Sent '%s'" % message)
connection.close()
- receive_logs.py 接受并打印日志
#!/usr/bin/env python3
# coding=utf-8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 声明"logs" exchange,类型为fanout(广播)
channel.exchange_declare(exchange="logs", exchange_type="fanout")
# 使用RabbitMQ生成随机名的临时队列,设置exclusive标志,当consumer断开连接时,销毁队列
result = channel.queue_declare(exclusive=True)
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
# result.method.queue 队列名称
queue_name = result.method.queue
# 绑定exchange和queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
参考文档:http://www.rabbitmq.com/tutorials/tutorial-three-python.html
网友评论