美文网首页程序员
RabbitMQ -- part5 [Topic]

RabbitMQ -- part5 [Topic]

作者: bloke_anon | 来源:发表于2018-03-02 13:49 被阅读19次
    Topic exchange

    上节介绍了通过使用direct exchange,将消息推送到binding的队列,但是有局限性,例如在收取日志时,想收取error级别的log,不收取info级别的log等,此时需要使用topic exchange。
    topic exchange中routing_key的值是一列通过点(dots)符号分隔的单词,例如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit" 可以由多个单词组成,长度上限是255字节。

    topic exchange允许使用通配符:

    1. *(star) 替代一个单词
    2. #(hash) 替代零个或多个单词

    如图:

    以上图为例,创建三个bindings:Q1绑定 "*.orange.*",表示接受有关orange相关的消息;Q2绑定 "*.*.rabbit" 和 "lazy.#",表示接受有关rabbits和lazy相关的消息。

    routing key举例:

    • 消息 "quick.orange.rabbit" 将会同时被推送到两个队列中
    • 消息 "lazy.orange.elephant" 也会被推送到两个队列中
    • 消息 "quick.orange.fox" 会推送到Q1
    • 消息 "lazy.brown.fox" 会推送到Q2
    • 消息 "lazy.pink.rabbit" 会推送到Q2,虽然匹配到两个规则,但也只推送一次
    • 消息 "quick.brown.fox" 将会被丢弃,因为没有匹配到任何规则
    • 消息 "orange" 或 "quick.orange.male.rabbit" 没有匹配项,会被丢弃
    • 消息 "lazy.orange.male.rabbit" 匹配到 "lazy.#" 规则,将会推送到Q2

    如果队列的 binding key 为 "#",则会像 fanout exchange一样,接收所有消息;如果 binding key 中没有指定 "*"(star)和 "#"(hash),则会向 direct exchange一样。

    实例代码
    • 编辑 emit_log_topic.py 提交日志
    #!/usr/bin/env python3
    # coding=utf-8
    
    import pika
    import sys
    
    message = ' '.join(sys.argv[1:]) or 'Hello World!'
    severity = sys.argv[1] if len(sys.argv) > 2 else 'anonymouse.info'
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    
    # 声明使用 exchange type 为 topic
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    
    channel.basic_publish(exchange='topic_logs',
                          routing_key=severity,
                          body=message,
    )
    
    print("[x] Sent '%s'" % message)
    
    connection.close()
    
    • 编辑 receive_logs_topic.py 接收日志
    #!/usr/bin/env python3
    # coding=utf-8
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    
    # 声明 topic type
    channel.exchange_declare(exchange="topic_logs", exchange_type="topic")
    
    result = channel.queue_declare(exclusive=True)
    
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    
    if not severities:
        sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
        sys.exit(1)
    
    def callback(ch, method, properties, body):
        print('[x] Received %r' % body)
    
    for severity in severities:
        channel.queue_bind(exchange='topic_logs',
                          queue=queue_name,
                          routing_key=severity)
    
    channel.basic_consume(callback, queue=queue_name, no_ack=True)
    
    print('[*] Waiting for messgaes. To exit press CTRL+C')
    
    channel.start_consuming()
    
    • 执行
    1. 启动emit_log_topic.py

    ./emit_log_topic.py "kern.critical" "A critical kernel error"

    1. 启动consumer1:
    ./receive_logs_topic.py "#"
    [*] Waiting for messgaes. To exit press CTRL+C
    [x] Received b'kern.critical A critical kernel error'
    
    1. 启动consumer2:
    ./receive_logs_topic.py "kern.*"
    [*] Waiting for messgaes. To exit press CTRL+C
    [x] Received b'kern.critical A critical kernel error'
    
    1. 启动soncumer3:
    /receive_logs_topic.py "*.critical"
    [*] Waiting for messgaes. To exit press CTRL+C
    [x] Received b'kern.critical A critical kernel error'
    
    1. 启动consumer4:
    ./receive_logs_topic.py "kern.*" "*.critical"
    [*] Waiting for messgaes. To exit press CTRL+C
    [x] Received b'kern.critical A critical kernel error'
    

    参考文档: http://www.rabbitmq.com/tutorials/tutorial-five-python.html


    相关文章

      网友评论

        本文标题:RabbitMQ -- part5 [Topic]

        本文链接:https://www.haomeiwen.com/subject/avlaxftx.html