美文网首页
python使用RabbitMQ之pika客户端

python使用RabbitMQ之pika客户端

作者: 哥本哈根月光 | 来源:发表于2019-02-21 16:58 被阅读0次

    RabbitMQ是比较流行的MQ(Message Queue), 下面介绍下python连接RabbitMQ的客户端pika的简单使用

    关于RabbitMQ的介绍,请参考 消息队列之RabbitMQ

    为了方便连接,先创建一个自定义的RabbitMQ连接对象。

    rabbit.py

    #!/usr/bin/env python  
    # -*- coding:utf-8 -*- 
    
    import pika
    
    
    class RabbitMQ(object):
        def __init__(self, host, port, username, password, vhost):
            self._host = host  # broker IP
            self._port = port  # broker port
            self._vhost = vhost  # vhost
            self._credentials = pika.PlainCredentials(username, password)
            self._connection = None
    
        def connect(self):
            # 连接RabbitMQ的参数对象
            parameter = pika.ConnectionParameters(self._host, self._port, self._vhost,
                                                  self._credentials, heartbeat_interval=10)
            self._connection = pika.BlockingConnection(parameter)  # 建立连接
    
        def put(self, message_str, queue_name, route_key, exchange=''):
            if self._connection is None:
                return
    
            channel = self._connection.channel()      # 获取channel
            channel.queue_declare(queue=queue_name)   # 申明使用的queue
            
            #  调用basic_publish方法向RabbitMQ发送数据, 这个方法应该只支持str类型的数据
            channel.basic_publish(
                exchange=exchange,  # 指定exchange
                routing_key=route_key,  # 指定路由
                body=message_str      # 具体发送的数据
            )
    
        def getting_start(self, queue_name):
            if self._connection is None:
                return
            channel = self._connection.channel() 
            channel.queue_declare(queue=queue_name)
            
            # 调用basic_consume方法,可以传入一个回调函数
            channel.basic_consume(self.callback,
                                  queue=queue_name,
                                  no_ack=True)
            channel.start_consuming()   # 相当于run_forever(), 当Queue中没有数据,则一直阻塞等待
    
        @staticmethod
        def callback(ch, method, properties, message_str):
            """定义一个回调函数"""
            print "[x] Received {0}".format(message_str)
    
        def close(self):
            """关闭RabbitMQ的连接"""
            if self._connection is not None:
                self._connection.close()
    

    接下来,定义生产者,向mq中发送数据

    producer.py

    #!/usr/bin/env python  
    # -*- coding:utf-8 -*- 
    
    import pika
    
    
    # credentials = pika.PlainCredentials('ethan', 'ethan123456')  # 用户名密码
    #
    # # 四个参数分别是  BrokerIP  BrokerPort, Vhost, username_and_password, 心跳时间间隔
    # parameter = pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials, heartbeat_interval=0)
    #
    # connection = pika.BlockingConnection(parameter)  # 建立连接
    # channel = connection.channel()        # 获得连接的channel对象
    #
    # channel.queue_declare(queue="yanchampion")   # queue声明
    #
    # channel.basic_publish(
    #     exchange='',
    #     routing_key='yanchampion',
    #     body='Hello pika!'
    # )                                        # basic_publish方法发送消息
    #
    # print("[X] send 'Hello pika!'")
    # connection.close()                       # 关闭连接
    
    
    from rabbit import RabbitMQ
    
    # RabbitMQ类的初始化参数,包括broker_ip, port, username, password, vhost
    args = ("127.0.0.1", 5672, "ethan", "ethan123456", "/")
    mq = RabbitMQ(*args)  # 传入初始化参数
    mq.connect()   # 调用connect方法,连接broker
    
    # 调用put方法,向目标queue中发送数据, 第一个参数是data, 第二个参数是queue_name, 第三个参数是route_name
    mq.put("hello RabbitMQ!!!", "yanchampion", "yanchampion") 
    
    # 发完数据,主动关闭连接
    mq.close()
    
    

    定义消费者,从mq中获取数据
    consumer.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    #
    # credentials = pika.PlainCredentials('ethan', 'ethan123456')
    # parameter = pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials, heartbeat_interval=10)
    # connection = pika.BlockingConnection(parameter)
    #
    # channel = connection.channel()
    #
    # channel.queue_declare(queue="yanchampion")
    #
    #
    # def callback(ch, method, properties, body):
    #     print "[x] Received {0}".format(body)
    #
    #
    # channel.basic_consume(callback,
    #                       queue='yan',
    #                       no_ack=True)
    #
    # print "[*] waiting for messages. To exit press CTR+CL"
    # channel.start_consuming()
    
    from rabbit import RabbitMQ
    
    # RabbitMQ类的初始化参数,包括broker_ip, port, username, password, vhost
    args = ("127.0.0.1", 5672, "ethan", "ethan123456", "/")
    mq = RabbitMQ(*args)  # 传入初始化参数
    mq.connect()   # 调用connect方法,连接broker
    
    mq.getting_start("yanchampion")  # 调用getting_start方法从queue中获取data, 传入的参数是queue_name
    
    

    以上就是pika连接RabbitMQ的简单使用。更加详细pika的使用方法未来更新,未完待续!!!

    相关文章

      网友评论

          本文标题:python使用RabbitMQ之pika客户端

          本文链接:https://www.haomeiwen.com/subject/dfldyqtx.html