RabbitMQ在应用中一定包含下面三种角色
- 一个rabbitmq-client,作为消息发出者。
- 一个rabbitmq-client,作为消息接收者。
- 一个rabbitmq-server,作为中间转发broker。
生产者 - 消费者模式
一个消息只能被一个消费者取走
。
client端充当生产者或者消费者,消息都存储在server端的消息队列中。
客户端,不管是生产者还是消费者,都要连接到server先。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.117.175.50')) //连接到server端
channel = connection.channel() //
channel.queue_declare(queue='hello') //声明server端消息队列的名称
对于生产者,向队列publish
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') //routing_key设置为queue的名称
对于消费者,从队列中consume
def callback(ch, method, properties, body):
...
channel.basic_consume(callback, queue='hello', no_ack=True) // 设计一个回调函数来处理消息。
参数delivery_mode的作用
delivery_mode = 2 意味着要publish的这条消息在server重启之后依然要不丢失。
channel.basic_qos(prefetch_count=X)的作用
在有多个消费者的情况下,默认的如果不设置prefetch_count,server会把消息轮流分给各个消费者去处理。
消费者如果设置了prefetch_count=1,那么它在处理完消息之后需要发出确认ack,
ch.basic_ack(delivery_tag = method.delivery_tag)
server只会在收到它的ack之后才会再分给它新消息。
server端挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到其设置的prefetch_count个数。
发布 - 订阅模式
多播,一个消息被所有订阅者接收
。
发布者可以有多个,订阅者可以有多个。
需要设置exchange
channel.exchange_declare(exchange='logs', type='fanout') // 设置exchange的类型
对于消息发布者
channel.basic_publish(exchange='logs', routing_key='', body=message) // routing_key设置为空
对于消息订阅者
result = channel.queue_declare(exclusive=True)
channel.queue_bind(exchange='logs', queue=result.method.queue) // 绑定到特定的queue
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback, queue=result.method.queue, no_ack=True)
channel.start_consuming()
消息分类的发布-订阅模式
消息是有分类的,比如log条目的级别error/warn/info。
消息根据routing_key的设定进行Routing。
发布者按照分类进行消息发布,订阅者按需进行消息订阅。
对于发布者
channel.exchange_declare(exchange='direct_logs', type='direct')
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) // 需要设定当前消息的routing_key
对于订阅者
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) // 需要按需绑定routing_key
消息多重分类的发布-订阅模式
消息的分类更多,还支持对分类进行正则匹配。注意通配符和别的地方不太一样。
- . (dot) 点符号用来隔离不同级别的分类。
- * (star) can substitute for exactly one word.
- # (hash) can substitute for zero or more words.
对于发布者
channel.exchange_declare(exchange='topic_logs', type='topic')
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) // 需要设定当前消息的
对于订阅者
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) // 需要按需绑定routing_key
RPC请求 - 响应模式
将rabbitmq设计成一个RPC服务框架。这个设计很有意思。
RPC请求者是主动方。PRC服务者是被动方。
RPC请求者,首先订阅消息,回调函数是处理PRC响应值。 客户端需要主动发布消息来作为RPC请求。
RPC服务者,首先订阅消息,回调函数是处理PRC的请求,计算,并发布消息来作为PRC的响应。
服务者取出一条消息(作为RPC请求,来自于某个请求者),之后会再发一条消息(作为RPC响应)给那个请求者。
所以需要设置 properties=pika.BasicProperties(correlation_id = props.correlation_id) 以记住消息请求者的身份。
网友评论