美文网首页程序员
RabbitMQ实战4.发布与订阅

RabbitMQ实战4.发布与订阅

作者: 闲睡猫 | 来源:发表于2018-08-10 08:48 被阅读151次

    继上篇 RabbitMQ实战3.公平调度

    RabbitMQ并非直接将消息投递到队列中,而是要经过交换机,交换机再与队列绑定。那么,什么是交换机? 如何通过交换机与队列的绑定实现发布与订阅功能?

    交换机

    先来了解前面课程中用到的交换机

    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message)
    

    代码中的exchange参数就是指交换机,为空表示默认交换机或者匿名交换机,这种交换机有个特点,即routing_key路由指定的是队列名称。这句代码可以理解为:RabbitMQ经由默认的交换机将消息投递到task_queue队列中。

    交换机还有以下几种类型:

    • 直连交换机(direct)
    • 主题交换机(topic)
    • 头交换机 (headers)
    • 扇型交换机(fanout)

    显示交换机列表

    ☁  rabbitMq [master] ⚡ rabbitmqctl list_exchanges
    Listing exchanges for vhost / ...
    amq.direct  direct
    amq.rabbitmq.trace  topic
    amq.topic   topic
    amq.match   headers
    amq.headers headers
    amq.fanout  fanout
    

    本篇主要阐述扇形交换机

    临时队列

    之前的实战文章中,生产者和消费者都是共用同一个具名的队列。本篇要实现的是发布与订阅功能,即生产者发布消息后,不同消费者是从不同的队列中获取消息。这就不可能在生产者中指定具名的队列名称,因为不可能预先知道有多少个队列。这种情况就需要用临时队列。

    临时队列:当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只需要在调用queue_declare方法的时候,不提供queue参数就可以了:

    result = channel.queue_declare() # 创建了一个临时队列
    

    通过result.method.queue获得已经生成的随机队列名

    当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的。

    result = channel.queue_declare(exclusive=True)
    

    生产者投递消息时不需要指定队列,只需要指定类型为扇形的交换机,扇形交换机会将消息推送到所有的队列中

    发布与订阅功能实现

    交换机

    新建 emit_log.py

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    # 声明交换机
    channel.exchange_declare(exchange='logs',  # 交换机名称
                             exchange_type='fanout')  # 交换机类型
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',  # 指定交换机
                          routing_key='',  # 指定队列,会被fanout交换机覆盖,因此置为空
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    

    新建 receive_logs.py

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    result = channel.queue_declare(exclusive=True)  # 声明临时队列
    queue_name = result.method.queue  # 获取队列名称
    
    # 交换机与队列绑定
    channel.queue_bind(exchange='logs',  # 声明要绑定的交换机
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

    执行结果

    消费者1:直接将结果显示在终端

    ☁  rabbitMq [master] ⚡ python receive_logs.py
     [*] Waiting for logs. To exit press CTRL+C
     [x] b'info: Hello World!'
    

    消费者2:将结果写到文件

    ☁  rabbitMq [master] ⚡ python receive_logs.py > logs_from_rabbit.log
    

    发送消息

    ☁  rabbitMq [master] ⚡ python emit_log.py
     [x] Sent 'info: Hello World!'
    

    查看队列的绑定信息

    ☁  rabbitMq [master] ⚡ rabbitmqctl list_bindings | grep logs
    logs    exchange    amq.gen-SOqwANvv-M0ACC1lWV_MpA  queue   amq.gen-SOqwANvv-M0ACC1lWV_MpA  []
    logs    exchange    amq.gen-eAjX2hrcOsurdyoRYJvmow  queue   amq.gen-eAjX2hrcOsurdyoRYJvmow  []
    

    查看日志文件

    ☁  rabbitMq [master] ⚡ cat logs_from_rabbit.log
     [*] Waiting for logs. To exit press CTRL+C
     [x] b'info: Hello World!'
    

    流程总结

    1. 定义扇形交换机,发送到所有队列

    2. 消费者声明临时队列,将队列与交换机绑定,生产者发送消息,交换机将消息发送到所有绑定的队列

    3. 不同队列有不同的消费者,每个消费者对应不同的队列

    参考文档

    相关文章

      网友评论

        本文标题:RabbitMQ实战4.发布与订阅

        本文链接:https://www.haomeiwen.com/subject/vaohbftx.html