Topic exchange
上节介绍了通过使用direct exchange,将消息推送到binding的队列,但是有局限性,例如在收取日志时,想收取error级别的log,不收取info级别的log等,此时需要使用topic exchange。
topic exchange中routing_key的值是一列通过点(dots)符号分隔的单词,例如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit" 可以由多个单词组成,长度上限是255字节。
topic exchange允许使用通配符:
- *(star) 替代一个单词
- #(hash) 替代零个或多个单词
如图:
以上图为例,创建三个bindings:Q1绑定 "*.orange.*",表示接受有关orange相关的消息;Q2绑定 "*.*.rabbit" 和 "lazy.#",表示接受有关rabbits和lazy相关的消息。
routing key举例:
- 消息 "quick.orange.rabbit" 将会同时被推送到两个队列中
- 消息 "lazy.orange.elephant" 也会被推送到两个队列中
- 消息 "quick.orange.fox" 会推送到Q1
- 消息 "lazy.brown.fox" 会推送到Q2
- 消息 "lazy.pink.rabbit" 会推送到Q2,虽然匹配到两个规则,但也只推送一次
- 消息 "quick.brown.fox" 将会被丢弃,因为没有匹配到任何规则
- 消息 "orange" 或 "quick.orange.male.rabbit" 没有匹配项,会被丢弃
- 消息 "lazy.orange.male.rabbit" 匹配到 "lazy.#" 规则,将会推送到Q2
如果队列的 binding key 为 "#",则会像 fanout exchange一样,接收所有消息;如果 binding key 中没有指定 "*"(star)和 "#"(hash),则会向 direct exchange一样。
实例代码
- 编辑 emit_log_topic.py 提交日志
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
message = ' '.join(sys.argv[1:]) or 'Hello World!'
severity = sys.argv[1] if len(sys.argv) > 2 else 'anonymouse.info'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 声明使用 exchange type 为 topic
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.basic_publish(exchange='topic_logs',
routing_key=severity,
body=message,
)
print("[x] Sent '%s'" % message)
connection.close()
- 编辑 receive_logs_topic.py 接收日志
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 声明 topic type
channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
for severity in severities:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=severity)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
- 执行
- 启动emit_log_topic.py
./emit_log_topic.py "kern.critical" "A critical kernel error"
- 启动consumer1:
./receive_logs_topic.py "#"
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'kern.critical A critical kernel error'
- 启动consumer2:
./receive_logs_topic.py "kern.*"
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'kern.critical A critical kernel error'
- 启动soncumer3:
/receive_logs_topic.py "*.critical"
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'kern.critical A critical kernel error'
- 启动consumer4:
./receive_logs_topic.py "kern.*" "*.critical"
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'kern.critical A critical kernel error'
参考文档: http://www.rabbitmq.com/tutorials/tutorial-five-python.html
网友评论