美文网首页
RabbitMQ入门

RabbitMQ入门

作者: manbug | 来源:发表于2018-04-26 11:04 被阅读0次

    一个整体的项目正在逐步分离成各个服务,系统间的通讯采用消息队列传递,消息队列选择了RabbitMQ

    服务器上如何安装rabbit这里就不说了,看官网或者随便找个教程。
    这里简单讲述python操作rabbit的过程

    一. python操作

    1. 安装

    官网推荐的库是pika,名字很好,总想接个qiu

    pip install -i https://pypi.doubanio.com/simple pika
    
    2. 连接
    connection = BlockingConnection(ConnectionParameters(HOST, PORT, credentials=PlainCredentials(USERNAME, PASSWORD)))
    
    3. Publisher

    封装成一个class,方便调用

    class RabbitMqPublisher(object):
        # 必要参数
        def __init__(self, host, port, username, passwd):
            self.host = host
            self.port = port
            self.username = username
            self.passwd = passwd
            self.connection = None
    
        # 初始化实例(建立连接)
        def init_instance(self):
            self.connection = BlockingConnection(
                ConnectionParameters(
                    self.host,
                    self.port,
                    credentials=PlainCredentials(self.username, self.passwd)))
            self.chk_instance_init()
    
        # 检查是否成功建立连接
        def chk_instance_init(self):
            assert self.connection is not None
    
        # 测试连接
        def test_connect(self):
            return not self.connection.is_closed
    
        # 发布消息
        def publish(self, exchange_type, exchange_name, mq_body, routing_key=""):
            self.init_instance()
            channel = self.connection.channel()
            # 有两种发布队列的方式(后续普及相关知识)
            # 1 指定队列
            # channel.queue_declare(queue=mq_name)
            # channel.basic_publish(exchange="", routing_key=mq_name, body=mq_body)
            # 2 指定exchange
            channel.exchange_declare(
                exchange=exchange_name, exchange_type=exchange_type, passive=False, durable=True
            )  # passive=True: 不会新创建exchange方法
            channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=mq_body)
            self.close()
    
        # 关闭连接
        def close(self):
            self.connection.close()
    
    4. 调用
    publisher = RabbitMqPublisher(host=, port=, username=, passwd=)
    
    5. Consumer
    class RabbitMqConsumer(object):
        def __init__(self, host, port, username, passwd):
            self.host = host
            self.port = port
            self.username = username
            self.passwd = passwd
            self.connection = None
    
        def init_instance(self):
            self.connection = BlockingConnection(
                ConnectionParameters(
                    self.host,
                    self.port,
                    credentials=PlainCredentials(self.username, self.passwd)))
            self.chk_instance_init()
    
        def chk_instance_init(self):
            assert self.connection is not None
    
        def test_connect(self):
            return not self.connection.is_closed
    
        def consume(self, queue_name="han-log", exchange_name="", need_bind=False):
            self.init_instance()
            channel = self.connection.channel()
    
            def callback(ch, method, properties, body):
                print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) +
                      ": [x] Received %r" % body, flush=True)
                res = json.loads(body)
    
                if res["sender"] == "HAN":
                    file = open(
                        "/var/log/mshan/daily-{}.log".format(
                            time.strftime("%Y-%m-%d", time.localtime(time.time()))
                        ), "a"
                    )
                    file.write("【{} verson-{}】INFO: {}".format(res.get("updated"), res.get("ver"), res.get("val")))
                    file.write("\n")
                    file.close()
                    ch.basic_ack(delivery_tag=method.delivery_tag)
            # 判断是否需要把exchang绑定queue(因为上述publish针对exchange,consume针对queue)
            if need_bind:
                channel.queue_declare(queue=queue_name, durable=True)
                channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key="")
    
            channel.basic_consume(callback, queue=queue_name, no_ack=False)
    
            # Start event loop
            print('Receiving return shipment events...', flush=True)
            channel.start_consuming()
    
        def close(self):
            self.connection.close()
    

    二. rabbitmq相关知识

    • Publisher
      从发布者的角度而言,可以指定两种发送queue的方式,一种是发送到exchange,由exchange决定分发到哪些队列;另一种是直接发送。

    • 流程图


      rbmq流程.jpg
    • routing_key
      这个概念没有在图中画出
      事实上这个可以理解成图中每一个Q的name
      每个exchange应该绑定某些Q(下述),这个绑定就是通过routing_key
      每个publisher发布的消息中也应该指定routing_key
      不过某些特定规则下publisher的routing_key会被忽略(下述)

    • exchange
      分发规则,可以定义名称exchange,类型exchange_type,(如果没有)是否创建passive,持久化durable

    channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, passive=False, durable=True)
    

    其中exchange_type分三种:

    1. fanout
      默认选择项,即无规则
      单纯的分发到exchange明确指定的queue中
      routing_key在此规则内不起作用
    2. direct
      一对一规则
      publisher除了指定exchange,还要指定routing_key
      只有当该exahnge绑定的队列中包含此routing_key的时候,消息才会被queue接受。
    3. topic
      灵活规则
      exchange绑定队列时可以绑定按照某种规则绑定
      如图:


      rbmq_topic.png
    • 服务器端操作
      1.查看所有的exchangerabbitmqctl list_exchanges
      2.查看所有的队列rabbitmqctl list_queues

    相关文章

      网友评论

          本文标题:RabbitMQ入门

          本文链接:https://www.haomeiwen.com/subject/zulplftx.html