经常听人说到消息队列。顾名思义,消息队列,处理的对象是消息,而队列是先进先出。我们随随便便的一个请求,可能涉及到多个服务,服务之间需要互相通信,那就是消息。消息队列是一种进程之间通信或者同一进程不同线程之间的通信方式。主要解决应用的耦合、异步消息、流量削峰。
虽然kafka很有名,但是rabbitMQ的文档更好看,对初学者很友好。
学习消息队列,我们首先要了解2个东西,生产者(Produce)和消费者(Consumer)。生产者发送消息,消费者消费消息。
Producer发送消息hello队列,然后Consumer从hello队列去获取消息
RabbitMQ是一套开源(MPL)的消息队列服务软件,它实现了高级消息队列协议(AMQP)。它提供server服务,同时支持多种语言的客户端连接。
首先我们要确定自己需要将rabbitMQ的Server服务部署在哪台(或哪几台)机器上。然后我在那台机器上下载安装rabbitMQ或者使用docker启动 。我在我本机上(mac)用docker启动。
截屏2022-04-08 下午6.14.28.png
docker命令启动如下:
# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
rabbitMQ服务起来之后,要使用它上面的消息队列。我们这边需要与之进行连接通讯。python官方推荐使用pika。所以我们需要下载pika
pip install pika
一切准备就绪,我们可以开始了。首先是我们的生产者producer。它需要:
1、与rabbitMQ建立连接。创建一个pika. BlockingConnection的连接对象,并创建获取一个与之通信的channel。
2、使用queue_declare声明一个队列,如果需要则创建。因为我们发送消息之前必须确保相应的队列是存在的,如果不存在,则rabbitMQ会把该消息丢弃掉。
3、通过channel发送消息到rabbitMQ Server。通常情况下,消息都不会直接发送到队列,而是会经过exchange交换机。但是RabbitMQ提供了一个默认的exchange,当使用默认的exchange时,我们可以通过routing_key直接指定发送给哪个队列。
具体代码如下:sender.py
import pika
#与broker建立连接,默认端口是5672
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建一个新的channel,可以指定channel_num,未指定的话由系统分配一个有效的。
channel = connection.channel()
#声明一个"hello"队列,如果不存在,有需要就创建
channel.queue_declare(queue='hello')
#发送5次Hello World消息
for i in range(5):
#exchange设置为' '表示使用默认的exchange,通过routing_key指定直接发送到哪个队列,routing_key的值即为queue的名称。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!'+str(i))
print("[X] Sent 'Hello World!"+str(i))
#关闭连接
connection.close()
然后我们再看下消费者怎么消费队列的消息:
receive.py
import sys
import os
import pika
#定义一个收到消息后的回调函数,该函数需要4个参数:
# - channel: BlockingChannel,表示所在的channel
# - method: spec.Basic.Deliver
# - properties: spec.BasicProperties
# - body: bytes,表示收到的消息
def callback(ch,method,properties,body):
print("[X] Received %r" %body)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#同样,声明一个队列,如果不存在就创建。
channel.queue_declare(queue='hello')
#调用basic_consume,指定queue和回调函数,接受消息
# auto_ack: 自动确认消息
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print("[*] Waiting for message.To exit press Ctrl+C")
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
准备就绪,我们依次启动receive.py、sender.py,运行结果如下:
sender.png receive.png
上面我们是先启动receive,等待着,然后再启动sender.如果我们先启动sender,再启动receive,打印也还是同上,只是不是发一个取一个了,而是发5个,取5个。也就是只要发送出去的,存在队列里,没有人来取的话,就在队列里等着别人来取。
当然,有时我们可能还不止一个消费者,
多个消费者
我们可以开启2个receive同时接收消息,那么它们会依次取出队列的消息,不重复。我们发送10个消息,打印如下:
send:
[X] Sent 'Hello World!0
[X] Sent 'Hello World!1
[X] Sent 'Hello World!2
[X] Sent 'Hello World!3
[X] Sent 'Hello World!4
[X] Sent 'Hello World!5
[X] Sent 'Hello World!6
[X] Sent 'Hello World!7
[X] Sent 'Hello World!8
[X] Sent 'Hello World!9
receive1:
[X] Received b'Hello World!0'
[X] Received b'Hello World!2'
[X] Received b'Hello World!4'
[X] Received b'Hello World!6'
[X] Received b'Hello World!8'
receive2:
[X] Received b'Hello World!1'
[X] Received b'Hello World!3'
[X] Received b'Hello World!5'
[X] Received b'Hello World!7'
[X] Received b'Hello World!9'
好了,以上我们就完成了一个简单的消息队列的使用了!
接下来,我们再看两个参数:auto_ack和x-single-active-consumer
auto_ack:自动确认消息标志。默认为False。上面我们设置为True,实际上就是让它在收到消息后给rabbitMQ发送一个消息确认消息,通知它我们已经收到消息了,它可以将消息从消息队列中删除了。这样我们如果receive如果中断,这些已经被处理的消息就不在队列中了。
上面的例子中,我们的回调函数很简单,只是打印出消息。但是大部分情况下,我们收到消息后,都需要做一些处理,甚至有些是耗时的密集型处理,可能需要稍等几秒。这时我们就不能使用auto_ask了,因为万一该任务突然中断,该消息还没处理完,我们就直接将其从消息队列中删掉了,我们就没办法完整地完成这条消息的处理了。我们不想丢掉任何一条消息。我们只想在我们已经处理完该条消息了之后再给rabbitMQ发送确认消息。这时,就需要用到basic_ack。在消息处理完成之后,调用:ch.basic_ack(delivery_tag=method.delivery_tag),手动发送确认消息。
如果我们忘记调用basic_ack,消息就会一直存在消息队列中吗?那倒也不是,rabbitMQ有一个默认的消息确认超时时间:30分钟。超过30分钟未收到确认,它就会因PRECONDITION_FAILED而异常关闭channel,然后再这个channel上的所有消费者,都将重新队列。
这个时间在rabbitmq.conf可定义:
# 30 minutes in milliseconds
consumer_timeout = 1800000
同时你还可以在advanced.config中完全禁用这个超时时间,只是不推荐使用
%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
x-single-active-consumer:单个激活状态的消费者。
这个是在声明队列的时候的参数,我们可以通过arguments参数,将x-single-active-consumer设置为True:
arguments = {"x-single-active-consumer":True}
channel.queue_declare(queue='hello1',arguments=arguments)
一旦将x-single-active-consumer设置为True,则这个队列只允许存在一个有效的消费者消费消息。上面的例子中,如果我们设置了这个参数,又启动了2个receive,则只有一个receive可以取到消息,它会取到所有的消息。
tips:如果一个队列已经创建为非x-single-active-consumer的,而你想更改其为x-single-active-consumer,上面的代码是行不通的,会报错,会提示你,声明的队列的和server上的队列不一致。
网友评论