美文网首页
RabbitMQ的消息模式

RabbitMQ的消息模式

作者: 那些年我们一起遇到过的坑 | 来源:发表于2017-12-22 14:42 被阅读0次

    RabbitMQ在应用中一定包含下面三种角色

    • 一个rabbitmq-client,作为消息发出者。
    • 一个rabbitmq-client,作为消息接收者。
    • 一个rabbitmq-server,作为中间转发broker。

    生产者 - 消费者模式

    一个消息只能被一个消费者取走
    client端充当生产者或者消费者,消息都存储在server端的消息队列中。

    客户端,不管是生产者还是消费者,都要连接到server先。

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.117.175.50'))  //连接到server端
    
    channel = connection.channel() // 
    
    channel.queue_declare(queue='hello')   //声明server端消息队列的名称
    

    对于生产者,向队列publish

    channel.basic_publish(exchange='', routing_key='hello',  body='Hello World!') //routing_key设置为queue的名称
    

    对于消费者,从队列中consume

    def callback(ch, method, properties, body):
         ...
    channel.basic_consume(callback, queue='hello', no_ack=True)   // 设计一个回调函数来处理消息。
    

    参数delivery_mode的作用

    delivery_mode = 2 意味着要publish的这条消息在server重启之后依然要不丢失。

    channel.basic_qos(prefetch_count=X)的作用

    在有多个消费者的情况下,默认的如果不设置prefetch_count,server会把消息轮流分给各个消费者去处理。

    消费者如果设置了prefetch_count=1,那么它在处理完消息之后需要发出确认ack,
    ch.basic_ack(delivery_tag = method.delivery_tag)
    server只会在收到它的ack之后才会再分给它新消息。
    server端挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到其设置的prefetch_count个数。

    发布 - 订阅模式

    多播,一个消息被所有订阅者接收
    发布者可以有多个,订阅者可以有多个。

    需要设置exchange
    channel.exchange_declare(exchange='logs', type='fanout') // 设置exchange的类型

    对于消息发布者

    channel.basic_publish(exchange='logs', routing_key='', body=message)  // routing_key设置为空
    

    对于消息订阅者

    result = channel.queue_declare(exclusive=True)
    
    channel.queue_bind(exchange='logs', queue=result.method.queue)  // 绑定到特定的queue
    
    def callback(ch, method, properties, body):
      print(" [x] %r" % body)
    
    channel.basic_consume(callback, queue=result.method.queue, no_ack=True)
    
    channel.start_consuming()
    

    消息分类的发布-订阅模式

    消息是有分类的,比如log条目的级别error/warn/info。
    消息根据routing_key的设定进行Routing。
    发布者按照分类进行消息发布,订阅者按需进行消息订阅。

    对于发布者

    channel.exchange_declare(exchange='direct_logs', type='direct')
    channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)  // 需要设定当前消息的routing_key
    

    对于订阅者

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    for severity in severities:
     channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)  // 需要按需绑定routing_key
    

    消息多重分类的发布-订阅模式

    消息的分类更多,还支持对分类进行正则匹配。注意通配符和别的地方不太一样。

    • . (dot) 点符号用来隔离不同级别的分类。
    • * (star) can substitute for exactly one word.
    • # (hash) can substitute for zero or more words.

    对于发布者

    channel.exchange_declare(exchange='topic_logs', type='topic')
    
    channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) // 需要设定当前消息的
    

    对于订阅者

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    for binding_key in binding_keys:
      channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)  // 需要按需绑定routing_key
    

    RPC请求 - 响应模式

    将rabbitmq设计成一个RPC服务框架。这个设计很有意思。
    RPC请求者是主动方。PRC服务者是被动方。

    RPC请求者,首先订阅消息,回调函数是处理PRC响应值。 客户端需要主动发布消息来作为RPC请求。

    RPC服务者,首先订阅消息,回调函数是处理PRC的请求,计算,并发布消息来作为PRC的响应。

    服务者取出一条消息(作为RPC请求,来自于某个请求者),之后会再发一条消息(作为RPC响应)给那个请求者。
    所以需要设置 properties=pika.BasicProperties(correlation_id = props.correlation_id) 以记住消息请求者的身份。

    相关文章

      网友评论

          本文标题:RabbitMQ的消息模式

          本文链接:https://www.haomeiwen.com/subject/izatgxtx.html