美文网首页程序员
RabbitMQ -- part3 [Publish/Subsc

RabbitMQ -- part3 [Publish/Subsc

作者: bloke_anon | 来源:发表于2018-02-28 15:07 被阅读23次
上节介绍了将每个消息投递给一个consumer,本节介绍Publish/Subscribe(推送、订阅),将一个消息投递到多个consumer,实质是通过广播形式将消息传递给所有consumer

RabbitMQ消息模块的核心理念是producer不直接将消息交付到队列,并且producer通常不知道将消息交付到哪个队列中,producer仅仅将消息发送到 exchange。exchange其实就是一侧接受producer交付的消息,另一侧将消息推送到队列中。exchange必须知道接收的消息内容,然后通过 exchange type 来定义是将消息推送到指定队列、推送到多个队列还是直接丢弃消息。

exchanges

1. Exchange

exchange 的有效类型有:direct,topic,headers和fanout

这里主要关注"fanout",创建并命名为"logs":

channel.exchange_declare(exchange='logs', exchange_type='fanout')

fanout exchange,从字面也是就能知道,它广播所有的消息到它所知道的队列。

可以通过 rabbitmqctl 命令列出当前的exchange:rabbitmqctl list_exchanges

空字符("")代表默认exchange,消息将会交付到 routing_key 指定的队列中,前提是此队列必须存在。

2. Temporary queues

  • 1.连接到Rabbit的一个临时队列(使用随机名称命名队列),在声明队列时不需要指定queue参数:result = channel.queue_declare()

  • 2.一旦consumer连接关闭,则删除队列。设置 exclusive 标志:result = channel.queue_declare(exclusive=True)

3. Bindings

bindings

上面已经创建了"fanout exchange"和queue,现在需要告诉exchange将消息发送到queue。(关联exchange和queue的动作叫: binding)

channel.queue_bind(exchange='logs', queue=result.method.queue) # result.method.queue 是一个随机的队列名,例如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

列出"bindings": rabbitmqctl list_bindings


构建一个日志系统,由两个程序组成,第一个发送日志消息,第二个接受并且打印
python-three-overall
  1. emit_log.py 提交日志
#!/usr/bin/env python3
# coding=utf-8

import pika

message = ' '.join(sys.argv[1:]) or 'Hello World!'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# 声明"logs" exchange,类型为fanout(广播)
channel.exchange_declare(exchange='logs', exchange_type='fanout')

channel.basic_publish(exchange='logs',
                      routing_key='',   # fanout模式,需要提供此参数,但是忽略此值
                      body=message,
)

print("[x] Sent '%s'" % message)

connection.close()
  1. receive_logs.py 接受并打印日志
#!/usr/bin/env python3
# coding=utf-8

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# 声明"logs" exchange,类型为fanout(广播)
channel.exchange_declare(exchange="logs", exchange_type="fanout")

# 使用RabbitMQ生成随机名的临时队列,设置exclusive标志,当consumer断开连接时,销毁队列
result = channel.queue_declare(exclusive=True)

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)

# result.method.queue 队列名称
queue_name = result.method.queue

# 绑定exchange和queue
channel.queue_bind(exchange='logs', queue=queue_name)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()

参考文档:http://www.rabbitmq.com/tutorials/tutorial-three-python.html


相关文章

网友评论

    本文标题:RabbitMQ -- part3 [Publish/Subsc

    本文链接:https://www.haomeiwen.com/subject/fxzbxftx.html