美文网首页MQpython开发
RabbitMQ pika简单使用

RabbitMQ pika简单使用

作者: 宝宝家的隔壁老王 | 来源:发表于2017-10-06 07:25 被阅读47次

    MQ 全称为 Message Queue, 是一种应用程序对应用程序的通信方法。MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 遵循了 AMQP 协议的具体实现和产品。

    整篇代码撸下来预计耗时1.5h。

    参考博主:anzhsoft
    官网


    闲聊
    • 什么是 RabbitMQ ?
    • 为什么要用 RabbitMQ ?
    • RabbitMQ 怎么用?

    一、What RabbitMQ?

    MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ 和 JMS 类似,但不同的是 JMS 是 SUN JAVA 消息中间件服务的一个标准和API定义,而 MQ 则是遵循了 AMQP协议的具体实现和产品。

    这个系统架构图版权属于sunjun041640
    • 1、RabbitMQ Server
      • RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是维护一条从 Producer 到 Consumer 的路线。
    • 2、Exchange
      • where producers publish their messages(发送方发布消息的地方)
    • 3、Queue
      • where the messages end up and are received by consumers(接收方获取消息的地方)
    • 4、Producer(Client A, Client B)
      • 数据的发送方
    • 5、Consumer(Client 1, Client 2, Client 3)
      • 数据的接收方
    • 6、Connection
      • 就是一个 TCP 的连接。Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个 TCP 连接。
    • 7、Channel
      • 虚拟连接。它建立在上述的 TCP 连接中。数据流动都是在 Channel 中进行的。也就是说,一般情况是程序起始建立TCP 连接,第二步就是建立这个 Channel
    • 8、Bindings
      • Bindings are how the messages get routed from the exchange to particular queues.
    二、Why RabbitMQ

    对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块如何通信呢?可以使用 socket,或者 url 请求,但是还有很多问题需要解决,如:

    • 1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
    • 2)如何降低发送者和接收者的耦合度?
    • 3)如何让 Priority 高的接收者先接到数据?
    • 4)如何做到 load balance?有效均衡接收者的负载?
    • 5)如何有效的将数据发送到相关的接收者?也就是说将接收者 subscribe 不同的数据,如何做有效的 filter。
    • 6)如何做到可扩展,甚至将这个通信模块发到 cluster 上?
    • 7)如何保证接收者接收到了完整,正确的数据
      AMDQ 协议解决了以上的问题,而 RabbitMQ 实现了 AMQP
    三、How RabbitMQ

    一套 MQ 完整流程如下:

    首先将 RabbitMQ 服务启动

    Producer

    • 1、创建一个 connection
    • 2、在 connection 基础上创建一个频道 channel
    • 3、在频道 channel 上声明一个 exchange,参数为 exchange 的类型和名称
    • 4、在频道 channel 上发布消息,参数为 exchange 的名称以及路由 (routing_key) 以及消息体
    • 5、关闭 connection

    Consumer

    • 1、创建一个 connection
    • 2、在 connection 基础上创建一个 channel
    • 3、在频道 channel 上声明一个 exchange,参数为 exchange 的类型和名称
    • 4、在频道 channel 上声明一个 queue
    • 5、将 queue 绑定到声明的 exchange 上
    • 6、channel 开始监听对应 queue 上的消息,同时设置获取消息的回调
    四、example
    • 1、直接使用 queue ,难度等级 ★☆☆☆☆

    producer.py

    import pika  
    
    if __name__ == '__main__':
        # 创建一个connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        channel = connection.channel()
        
        # 声明一个queue
        channel.queue_declare(queue='hello')  
        
        # exchange为空的时候,routing_key就是指定的queue值
        channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')  
        print(" [x] Sent 'Hello World!'")
        # 关闭连接
        connection.close()
    

    consumer.py

    import pika  
    
    def callback(ch, method, properties, body):  
        print(" [x] Received %r" % (body,))
    
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        # 创建频道
        channel = connection.channel()  
        # 声明queue
        channel.queue_declare(queue='hello')  
        
        print(' [*] Waiting for messages. To exit press CTRL+C')
    
        # 收到指定消息的回调设置 
        channel.basic_consume(callback, queue='hello', no_ack=True)  
        # 开始循环监听 
        channel.start_consuming()
    
    • 2、消息确认,难度等级 ★☆☆☆☆

    producer.py

    import pika
    import sys
    
    if __name__ == '__main__':
    
        message = ' '.join(sys.argv[1:]) or "Hello World!"
        # 创建一个 connection
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        # 创建一个 channel
        channel = connection.channel()
        # 声明一个queue
        channel.queue_declare(queue='hello')
    
        # 发布消息,exchange 为空的情况下,routing_key 值就是指定的 queue 名字,即将消息直接发送到指定的 queue
        channel.basic_publish(exchange='', routing_key='hello', body=str(message))
        print(" [x] Sent {}".format(message))
        connection.close()
    

    consumer.py

    import pika  
    import time
    
    def callback(ch, method, properties, body):  
        print(" [x] Received %r" % body)
        time.sleep(str(body).count('.'))
        ch.basic_ack(delivery_tag = method.delivery_tag)
            
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        channel = connection.channel()  
          
        channel.queue_declare(queue='hello')  
          
        print(' [*] Waiting for messages. To exit press CTRL+C')
          
        channel.basic_consume(callback, queue='hello',no_ack=False)  
        
        channel.start_consuming()
    
    • 3、使用 fanout 类型 exchange,难度等级 ★☆☆☆☆

    producer.py

    import pika
    import sys
    
    if __name__ == '__main__':
    
        message = ' '.join(sys.argv[1:]) or "Hello World!"
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
        channel = connection.channel()
        channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
        channel.basic_publish(exchange='logs', routing_key='', body=str(message))
        print(" [x] Sent {}".format(message))
        connection.close()
    

    consumer.py

    import pika  
    import time
    
    def callback(ch, method, properties, body):  
        print(" [x] Received %r" % body)
        # 确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
        time.sleep(str(body).count('.'))
          
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        channel = connection.channel()  
          
        # 声明一个 exchange
        channel.exchange_declare(exchange='logs', exchange_type='fanout')
        # 声明一个随机名字的 queue
        result = channel.queue_declare()  
          
        print(' [*] Waiting for messages. To exit press CTRL+C')
        # 获取 queue 的 name 
        queue_name = result.method.queue
        # 将 queue 绑定到 exchange
        channel.queue_bind(exchange='logs', queue=queue_name)
        # 设置监听的 queue
        channel.basic_consume(callback, queue=queue_name)  
        
        channel.start_consuming()
    
    • 4、使用 direct 类型exchange,难度等级 ★☆☆☆☆

    producer.py

    import pika
    import sys
    
    if __name__ == '__main__':
    
        message = ' '.join(sys.argv[1:]) or "Hello World!"
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
        channel = connection.channel()
        # 声明一个 direct 类型的 exchange
        channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
        # 获取 routing_key 值
        severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
        channel.basic_publish(exchange='logs_direct', routing_key=severity, body=str(message))
        print(" [x] Sent {}".format(message))
        connection.close()
    

    consumer.py

    import pika  
    import sys
    import time
    
    def callback(ch, method, properties, body):  
        print(" [x] Received %r" % body)
        # 确认消息
        ch.basic_ack(delivery_tag = method.delivery_tag)
        time.sleep(str(body).count('.'))
          
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        channel = connection.channel()  
          
        # 声明一个 exchange
        channel.exchange_declare(exchange='logs_direct', exchange_type='direct')
        # 声明一个随机名字的 queue
        result = channel.queue_declare()  
    
        # 设置监听的 routing_key
        severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
          
        print(' [*] Waiting for messages. To exit press CTRL+C')
        # 获取 queue 的 name 
        queue_name = result.method.queue
        # 将 queue 绑定到 exchange
        channel.queue_bind(exchange='logs_direct', queue=queue_name, routing_key=severity)
        # 设置监听的 queue
        channel.basic_consume(callback, queue=queue_name)  
        
        channel.start_consuming()
    
    • 5、使用 topic 类型exchange难度等级 ★☆☆☆☆

    producer.py

    import pika
    import sys
    
    if __name__ == '__main__':
    
        message = ' '.join(sys.argv[1:]) or "Hello World!"
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  
        channel = connection.channel()
        # 声明一个 topic 类型的 exchange
        channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
        # 获取 routing_key 值
        severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
        channel.basic_publish(exchange='logs_topic', routing_key=severity, body=str(message))
        print(" [x] Sent {}".format(message))
        connection.close()
    

    consumer.py

    import pika  
    import sys
    import time
    
    def callback(ch, method, properties, body):  
        print(" [x] Received %r" % body)
        # 确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)
        time.sleep(str(body).count('.'))
      
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  
        channel = connection.channel()
    
        # 声明一个 exchange
        channel.exchange_declare(exchange='logs_topic', exchange_type='topic')
        # 声明一个随机名字的 queue
        result = channel.queue_declare()  
    
        # 设置监听的 routing_key
        severity = sys.argv[1] if len(sys.argv) > 1 else '111.111.111'
          
        print(' [*] Waiting for messages. To exit press CTRL+C')
        # 获取 queue 的 name 
        queue_name = result.method.queue
        # 将 queue 绑定到 exchange
        channel.queue_bind(exchange='logs_topic', queue=queue_name, routing_key=severity)
        # 设置监听的 queue
        channel.basic_consume(callback, queue=queue_name)  
        
        channel.start_consuming()
    
    五、实现细节部分
    • 1、Exchange
      • 空:直接将消息绑定到指定 queue 处理
      • fanout:广播,所有绑定到该 exchange 的 queue 都会收到消息
      • direct:定向,所有绑定到该 exchange 并且其 routing_key 也相同的 queue 能收到消息
      • topic:主题,所有绑定到该 exchange 并且 routing_key 符合其匹配的 queue 能收到消息。匹配规则如下:* (星号) 代表任意 一个单词,# (hash) 0个或者多个单词。即 111.111.111 是和 *.111.111、#.111 匹配的。
    六、RabbitMQ 常用指令
    • 1、服务器的启动与关闭·
    启动: rabbitmq-server –detached
    
    关闭: rabbitmqctl stop
    
    若单机有多个实例,则在 rabbitmqctlh 后加 –n 指定名称
    
    • 2、获取服务器状态
    服务器状态:rabbitmqctl status
    
    • 3、常用的命令
    查看所有的消息队列
    rabbitmqctl list_queues
    
    清除所有队列
    abbitmqctl reset
    
    启动应用
    rabbitmqctl start_app
    
    关闭应用
    rabbitmqctl stop_app
    
    查看所有的 Exchanges
    rabbitmqctl list_exchanges 
    
    查看所有的 bindings
    rabbitmqctl list_bindings
    

    相关文章

      网友评论

      本文标题:RabbitMQ pika简单使用

      本文链接:https://www.haomeiwen.com/subject/uqrlyxtx.html