RabbitMQ浅读

作者: 写写代码唱唱歌 | 来源:发表于2016-05-12 21:12 被阅读622次

    消息分发策略

    当有多个消费者时,RabbitMQ将会轮流的将消息发送给消费者.这种分发消息的方式称为'round-robin'
    看例子
    sender.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=sys.argv[1])
    
    print 'send done'
    connection.close()
    

    receiver.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare('hello')
    
    
    def callback(ch, method, prop, body):
        print body
    
    
    channel.basic_consume(callback, queue='hello', no_ack=True)
    channel.start_consuming()
    connection.close()
    

    运行两个消费者

    shell1$ python receiver.py
    
    shell2$ python receiver.py
    

    发送消息

    python sender.py msg1
    python sender.py msg2
    python sender.py msg3
    python sender.py msg4
    python sender.py msg5
    python sender.py msg6
    

    生产者收到的内容为
    receiver1

    shell1$ msg1
    shell1$ msg3
    shell1$ msg5
    

    receiver2

    shell1$ msg2
    shell1$ msg4
    shell1$ msg6
    


    可以看到消息是以轮询的方式发送给消费者的.

    消息容错机制

    客户端崩溃

    引入ack确认机制.客户端每成功消费一个消息,向服务器发送一个确认消息,通知服务器这条消息已经被成功消费.服务器收到消息后,将这条消息从unacknowledged中移除,否则,服务器就会一直等待客户端发来的ack消息.当客户端出现异常时,未消费的消息被重新放到消费队列中.这样避免了客户端崩溃造成的数据丢失.
    启用ack机制的消费者代码如下:

    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare('hello1')
    
    
    def callback(ch, method, prop, body):
        print body
        time.sleep(3)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback, queue='hello1', no_ack=False)
    channel.start_consuming()
    connection.close()
    

    就是要保证no_ack为False,这也是no_ack的默认值.
    这里一定要显式的发送确认消息`ch.basic_ack(delivery_tag=method.delivery_tag)明确的告诉服务器消息被处理了.
    查看队列中的消息数量,可以使用

    brianyang@brianyang-Latitude-E5440:/home/q/title/test$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello1  6   2
    

    当消费者在消费时被强行结束时,消息并没有丢失,只要出现可用的消费者时,消息会被重新发送.

    服务端崩溃

    可以通过持久化(durable)来确保通道和消息都被保存到磁盘中进行持久化,但是由于从内存写入磁盘也需要时间,如果这段时间出现故障,则这些消息也是会丢失的.所以durable是一种弱的持久化.
    持久化需要在queue和message中声明.
    sender.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='hello1', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='hello1',
                          body=sys.argv[1],
                          properties=pika.BasicProperties(
                              delivery_mode=2,
                          )
    )
    
    print 'send done'
    connection.close()
    

    receiver.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare('hello1', durable=True)
    
    
    def callback(ch, method, prop, body):
        print body
        time.sleep(3)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback, queue='hello1', no_ack=False)
    channel.start_consuming()
    connection.close()
    

    hello1被声明为了持久化的通道,这里不能用hello命名,因为之前已经存在了一个非持久化的通道hello,RabbitMQ不允许对一个已经声明过的通道进行重定义.
    生产者在发送消息时,将消息的类型定义为delivery_mode=2,用来将消息持久化.
    通过例子来看下效果,消息持久化前,也就是没有添加durable时,重启server后消息会丢失.

    非持久化
    添加持久化选项后,重启server后消息没有丢失.
    持久化
    RabbitMQ对新入列的消息进行分配,不会考虑消费者的状态,如果两个消费者一个处理能力强,一个处理能力弱,长时间下来就会造成一个消费者消息堆积,另一个消费者相对很闲,为了公平期间,可以设置每次每个消费者只分发N个任务,只有任务收到ack后,才继续分发任务.(当no_ack为True时,这个功能失效)
    修改后的代码receiver.py内容不变,sender.py修改为
    sender.py
    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare('hello1', durable=True)
    
    
    def callback(ch, method, prop, body):
        print body
        time.sleep(3)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='hello1', no_ack=False)
    channel.start_consuming()
    connection.close()
    

    prefetch_count=1意味着每次只为消费者分配一条消息,消费者处理完成之后,才分配新的消息

    exchange 发布/订阅模型

    exchange的作用好比:收发邮件时的邮件组.如果A,B在邮件组中,C不在邮件组中,那么当Z向邮件组发邮件时,只有A,B能收到,对于Z而言,并不关心邮件组里有谁,只负责向邮件组里发邮件,如果邮件组里没人,那么邮件就会被丢弃,当Z发了100封邮件后,C加入了邮件组,那么C只能收到从第101封开始的邮件,之前的邮件是看不到的.在这里Z是发布者,A,B,C是订阅着,邮件组是exchange.


    盗图一张

    其中P就是发布者(Z),C1,C2就是消费者(A,B,C),X就是exchange(邮件组),amq.gen-RQ6..和amq.gen_As8..就是exchange与消费者之间通信的通道(邮箱).
    上代码
    receiver.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange='exg', queue=queue_name)
    
    
    def callback(ch, method, prop, body):
        print body
        time.sleep(3)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback, queue=queue_name, no_ack=False)
    channel.start_consuming()
    connection.close()
    

    sender.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='exg', type='fanout')
    
    channel.basic_publish(exchange='exg',
                          routing_key='',
                          body=sys.argv[1])
    
    print 'send done'
    connection.close()
    

    下面是个例子,可以体会下与使用queue有什么不同.


    exchange

    直接使用channel时,消息是面对消费者的,每条消息都会等待消费者消费,而使用exchange时,消息是面对exchange的,对于是否有消费者通过channel与exchange绑定是未知的,exchange不会将消息保存起来等待消费者.

    路由

    使用type类型为fanout的exchange作为中转时,所有的订阅着都会收到相同的消息,假设我有两个用户,一个是vip,一个是普通用户,有些消息我只想发给vip,不想让普通用户也收到,这时候就要学习下路由功能.

    首先看下代码
    sender.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='exchange', type='direct')
    
    channel.basic_publish(exchange='exchange',
                          routing_key=sys.argv[1],
                          body=sys.argv[2])
    
    print 'send done'
    connection.close()
    

    receiver.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import pika
    import time
    import sys
    
    exg_types = sys.argv[1:]
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    for exg_type in exg_types:
        channel.queue_bind(exchange='exchange', queue=queue_name, routing_key=exg_type)
    
    
    def callback(ch, method, prop, body):
        print method.routing_key
        print body
        time.sleep(1)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback, queue=queue_name, no_ack=False)
    channel.start_consuming()
    connection.close()
    

    在receiver.py中,绑定exchange与queue的操作中多了一个routing_key=exg_type,这里routing_key就是一个路由标识,只有当sender使用basic_publish指定的routing_key等于这个routing_key时,消息才会通过exchange发送到该queue中,同时要注意,声明exchange的type也变为了direct而不是之前的fanout.从字面上也很容易理解,之前的方式是广播,现在的方式是直连.
    继续盗图

    routing_pic
    通过演示看下效果
    routing

    更加复杂的路由

    当exchange的type为direct时,通过判断绑定的routing_key与发送的routing_key是否相等来判断应该将消息放入到哪个channel中.这是最简单的一种匹配方式,设想有这样一种场景,公司给员工通过队列发送消息,员工分为程序猿,前端喵和产品狗,同时员工又分为不同的级别:初级,中级和高级,员工又有不同的性别,公司对每种类别的员工的消息也不同,例如对于初级女前端,公司的祝福语为:前端的萌妹子感谢你在公司1年的付出,对于高级男程序猿,公司的祝福语为:后端的屌丝男感谢你在公司3年的付出.
    对于这种维度更加广的路由,可以使用Topics. 使用Topics也很简单,首先将exchange的type变为topic.
    topics支持通配符,如下:

    • * (star) can substitute for exactly one word. 使用*表示一个单词
    • # (hash) can substitute for zero or more words. 使用#表示0个或多个单词
      routing_key的定义必须遵循 - 由一系列英文逗号分割的单词组成
      例如上面的高级男程序猿就可以定义为:high.man.monkey,初级女前端喵定义为:low.women.cat
      如果一个channel的routing_key为
    • high.# : 接收向所有的高级员工发送的信息
    • high.*.monkey : 接收向所有的高级程序猿发送的信息
    • ..dog : 向所有产品狗发送的信息

    原谅我这么粗鲁的称呼,这只是俚语!

    通过topic的方式,可以更加精准的控制路由
    继续盗图:


    topic

    代码如下:
    sender.py

    # encoding:utf8
    __author__ = 'brianyang'
    
    import sys
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='exchange', type='topics')
    
    channel.basic_publish(exchange='exchange',
                          routing_key=sys.argv[1],
                          body=sys.argv[2])
    
    print 'send done'
    connection.close()
    

    receiver.py代码不变,看下演示:


    topic

    实现RPC

    RabbitMQ将消息放在消息队列中,可以方便的实现生产者-消费者模型.而RPC(远程过程调用)是一种构建SOA非常关键的技术,即面向服务架构.服务可以分布在集群中,通过增减机器可以方便的扩展服务的处理能力.
    RabbitMQ实现RPC的原理就是,请求服务的应用将请求参数放入到请求队列中,同时传递一个回调队列和唯一id,回调队列用来存放服务方的计算结果,唯一id用来识别是客户端的哪一次请求,需要保证唯一性.
    服务端在请求队列中获取到消息后,进行计算,计算结束后将结果放入请求方给的回调队列中,同时传回唯一id.
    盗图


    RPC_pic

    首先看下代码
    customer.py

    # encoding:utf8
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.queue_declare('service_queue')
    
    
    def Fib(n):
        if n == 1 or n == 2:
            return n
        return Fib(n - 1) + Fib(n - 2)
    
    
    def Cal(ch, method, props, body):
        num = int(body)
        print 'cal {}'.format(num)
        resp = Fib(num)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=str(resp)
        )
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(Cal, queue='service_queue')
    channel.start_consuming()
    

    sender.py

    # encoding:utf8
    
    import pika
    import uuid
    import sys
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='service_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    
    num = int(sys.argv[1])
    response = fibonacci_rpc.call(num)
    print(" [x] Requesting fib({})".format(num))
    print(" [.] Result is %r" % response)
    

    根据官网的demo稍微修改
    效果如下:

    RPC
    可以看到当客户端请求计算一个比较大的数的Fib数列值的时候,客户端和服务器都阻塞了,当另一个客户端请求计算时,由于没有消费者可以消费所以也阻塞了,这就好比在生产中单台服务器提供服务遇到了瓶颈,SOA架构可以方便的扩容,在这里就是又启了一个消费者,通过这种方法可以动态的改变集群的处理能力.
    这些内容都在官方快速入门可以看到,我只是搬运工+汉化,加深印象,RabbitMQ tutorials

    好吧,简书竟然有张图传不上去,有兴趣的来看看原文

    相关文章

      网友评论

        本文标题:RabbitMQ浅读

        本文链接:https://www.haomeiwen.com/subject/vrpprttx.html