美文网首页
Python 实现RabbitMQ 发布订阅消息队列

Python 实现RabbitMQ 发布订阅消息队列

作者: 夜空最亮的9星 | 来源:发表于2021-03-30 09:38 被阅读0次

    消息生产(发布)者:

    # publisher.py
    import pika
    
    
    class Publisher:
    
        def __init__(self, config):
            self.config = config
    
        def publish(self, routing_key, message):
            connection = self.create_connection()
            channel = connection.channel()
            channel.exchange_declare(exchange=self.config['exchange']
                                     , exchange_type='topic')
            channel.basic_publish(exchange=self.config['exchange']
                                  , routing_key=routing_key, body=message)
    
            print("[x] Sent message %r for %r" % (message, routing_key))
    
        def create_connection(self):
            param = pika.ConnectionParameters(host=self.config['host']
                                              , port=self.config['port'])
            return pika.BlockingConnection(param)
    
    
    config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}
    publisher = Publisher(config)
    
    
    while True:
        message = input('please input message:')
        publisher.publish('topic_02', message)
    
    

    消息消费(订阅)者

    import pika
    import sys
    
    
    # https://medium.com/@rahulsamant_2674/python-rabbitmq-8c1c3b79ab3d
    class Subscriber:
        def __init__(self, queueName, bindingKey, config):
            self.queueName = queueName
            self.bindingKey = bindingKey
            self.config = config
            self.connection = self._create_connection()
    
        def __del__(self):
            self.connection.close()
    
        def _create_connection(self):
            parameters = pika.ConnectionParameters(host=self.config['host'],
                                                   port=self.config['port'])
            return pika.BlockingConnection(parameters)
    
        def on_message_callback(self, channel, method, properties, body):
            binding_key = method.routing_key
    
            print("received new message for -" + binding_key)
            print(" [x] Received %r" % body)
    
    
        def setup(self):
            channel = self.connection.channel()
            channel.exchange_declare(exchange=self.config['exchange'],
                                     exchange_type='topic')
            channel.queue_declare(queue=self.queueName)
            channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)
            channel.basic_consume(queue=self.queueName,
                                  on_message_callback=self.on_message_callback, auto_ack=True)
            print('[*] Waiting for data for ' + self.queueName + '. To exit press CTRL+C')
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
    
    
    config = {'host': '192.168.121.83', 'port': 5672, 'exchange': 'my_exchange'}
    
    subscriber = Subscriber('hello', 'topic_01', config)
    subscriber.setup()
    
    

    相关文章

      网友评论

          本文标题:Python 实现RabbitMQ 发布订阅消息队列

          本文链接:https://www.haomeiwen.com/subject/buywhltx.html