回顾
上一章,我们学习了在Mac上安装RabbitMQ,Mac安装RabbitMQ。
本章,学习一下如何使用RabbitMQ这个消息队列来发送消息。
前言
python 3.6
RabbitMQ 3.7.8
pika 1.1.0
在开始之前,我们需要了解一个设计模式——”生产者消费者“模式。
生产者消费者模式
让我们看一下维基百科中的定义。
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
数据流向
我画了草图,如下:

生产者将产生的数据发送给RabbitMQ,然后消费者再从RabbitMQ中把数据读取出来,做进一步处理。
当然,实际环境中并不会只有一个生产者和一个消费者,而是会有多个生产者和多个消费者。比如下图:

这里每个生产者或者消费者都是一个进程。
使用场景
一开始我也费解,原本两个函数就能解决的事情,为什么要引入一个消息队列呢?这岂不是增加了程序的复杂性么?
那是因为我很少接触多进程编程,引入消息队列时为了解决进程间同步问题,也可以说是进程间通信问题。
我们知道一个程序是以进程作为基本单位的,也知道各个进程拥有各自的内存资源,进程之间如果想进行通信,需要借助其他媒介(比如文件,数据库,消息队列)。
创建第一个消息队列
首先,需要保证RabbitMQ已经被启动,可以在命令行中输入rabbitmq-server启动RabbitMQ。
$ rabbitmq-server
如果看到下图,表示启动成功。

然后需要安装一个python第三方包,方便我们操纵RabbitMQ。
pip install pika
生产者和消费者
我们可以创建两个python脚本,分别用来表示生产者(send.py)和消费者(receive.py)。
生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World1.')
print("[x] sent 'Hello World.'")
connection.close()
首先,导入pika包,然后创建一个BlockingConnection连接,这个连接是连接到本地的RabbitMQ的。
然后创建一个频道(channel),并在这个频道中调用queue_declare()方法声明一个队列,并将队列名hello传入。
接下来,调用channel的basic_publish()方法将数据传递给队列hello。
生产者无法直接给队列hello传递消息,需要借助交换机,也就是这里的exchange参数,使用''(空字符串)表示默认交换机。
routing_key表示队列的名字,body表示数据的内容。
最后,调用connection的close()方法来关闭和RabbitMQ的连接。
到此为止,我们的数据已经发送给本地的RabbitMQ了。接下来我们需要去写一个消费者脚本来获取数据。
消费者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("[x] received {}".format(body))
channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
print("[x] waiting for messages. To exit press CTRL+C")
channel.start_consuming()
前面的步骤是一样,都是先建立连接,然后创建频道。
在第5行,我在频道中又一次声明了队列'hello',这是一种容错机制,当我们不知道是生产者脚本先运行还是消费者脚本先运行时,这样是一种推荐做法。
当然,我们现在明确的知道是生产者脚本先运行,而且已经声明了队列hello,所以不写这行也没事。
接下来,定义了一个回调函数callback,它的作用是在消费者从队列中成功读取到数据时调用的。
然后,调用channel.basic_consume()方法来获取数据,on_message_callback参数需要传入一个回调函数(就是刚刚我们写的callback函数),queue参数表示队列的名字,
auto_ack参数如果为True,”自动确认“模式会被开启。具体可以看这里。
最后,调用channel.start_consuming()方法一直消费队列中的数据,直到你按下Ctrl + C来终止。
验收结果
为了方便查看,我使用pycharm运行send.py脚本,使用命令行的方式运行receive.py脚本。
先启动send.py。如果没报错,结果会打印如下内容。

在命令行中再启动receive.py脚本。

PS:这里能看到两条记录是因为我刚刚按错了,多执行了一次send.py脚本。
然后再执行一次send.py,再回到命令行窗口里你会发现,又多了一条数据。

到目前为止,我们已经成功的走通我们的一个消息队列。
网友评论