美文网首页程序员
RabbitMQ -- part3 [Publish/Subsc

RabbitMQ -- part3 [Publish/Subsc

作者: bloke_anon | 来源:发表于2018-02-28 15:07 被阅读23次
    上节介绍了将每个消息投递给一个consumer,本节介绍Publish/Subscribe(推送、订阅),将一个消息投递到多个consumer,实质是通过广播形式将消息传递给所有consumer

    RabbitMQ消息模块的核心理念是producer不直接将消息交付到队列,并且producer通常不知道将消息交付到哪个队列中,producer仅仅将消息发送到 exchange。exchange其实就是一侧接受producer交付的消息,另一侧将消息推送到队列中。exchange必须知道接收的消息内容,然后通过 exchange type 来定义是将消息推送到指定队列、推送到多个队列还是直接丢弃消息。

    exchanges

    1. Exchange

    exchange 的有效类型有:direct,topic,headers和fanout

    这里主要关注"fanout",创建并命名为"logs":

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    fanout exchange,从字面也是就能知道,它广播所有的消息到它所知道的队列。

    可以通过 rabbitmqctl 命令列出当前的exchange:rabbitmqctl list_exchanges

    空字符("")代表默认exchange,消息将会交付到 routing_key 指定的队列中,前提是此队列必须存在。

    2. Temporary queues

    • 1.连接到Rabbit的一个临时队列(使用随机名称命名队列),在声明队列时不需要指定queue参数:result = channel.queue_declare()

    • 2.一旦consumer连接关闭,则删除队列。设置 exclusive 标志:result = channel.queue_declare(exclusive=True)

    3. Bindings

    bindings

    上面已经创建了"fanout exchange"和queue,现在需要告诉exchange将消息发送到queue。(关联exchange和queue的动作叫: binding)

    channel.queue_bind(exchange='logs', queue=result.method.queue) # result.method.queue 是一个随机的队列名,例如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

    列出"bindings": rabbitmqctl list_bindings


    构建一个日志系统,由两个程序组成,第一个发送日志消息,第二个接受并且打印
    python-three-overall
    1. emit_log.py 提交日志
    #!/usr/bin/env python3
    # coding=utf-8
    
    import pika
    
    message = ' '.join(sys.argv[1:]) or 'Hello World!'
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    
    # 声明"logs" exchange,类型为fanout(广播)
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    channel.basic_publish(exchange='logs',
                          routing_key='',   # fanout模式,需要提供此参数,但是忽略此值
                          body=message,
    )
    
    print("[x] Sent '%s'" % message)
    
    connection.close()
    
    1. receive_logs.py 接受并打印日志
    #!/usr/bin/env python3
    # coding=utf-8
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    
    # 声明"logs" exchange,类型为fanout(广播)
    channel.exchange_declare(exchange="logs", exchange_type="fanout")
    
    # 使用RabbitMQ生成随机名的临时队列,设置exclusive标志,当consumer断开连接时,销毁队列
    result = channel.queue_declare(exclusive=True)
    
    def callback(ch, method, properties, body):
        print('[x] Received %r' % body)
    
    # result.method.queue 队列名称
    queue_name = result.method.queue
    
    # 绑定exchange和queue
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    channel.basic_consume(callback, queue=queue_name, no_ack=True)
    
    print('[*] Waiting for messgaes. To exit press CTRL+C')
    
    channel.start_consuming()
    

    参考文档:http://www.rabbitmq.com/tutorials/tutorial-three-python.html


    相关文章

      网友评论

        本文标题:RabbitMQ -- part3 [Publish/Subsc

        本文链接:https://www.haomeiwen.com/subject/fxzbxftx.html