What is RabbitMQ####
RabbitMQ是一个消息代理、一个消息系统的媒介。他可以提供一个通用的消息发送和接收平台,并且保证消息在传输的过程中的安全。
注:使用pika Python客户端
核心理念####
RabbitMQ核心就是接收和发送消息,我们可以将它想象成一个邮局,我们把信件放在邮箱中,邮递员会将信件投递到你填写的收件人哪里。
寄信在这个比喻中,RabbitMQ就扮演着邮箱、邮局和邮递员的角色。
但是区别就是RabbitMQ处理的是消息(message)这种二进制数据。
RabbitMQ中专有名词####
-
生产者(Producing):就是消息产生的一方。发送消息的程序就是一个生产者。一般使用"P"表示。
Producing -
队列(Queue)就是邮箱的名称。消息通过应用程序和RabbitMQ进行传输,这些消息能够存储在一个队列中,队列没有限制,基本上是一个无限的缓冲,但是过多的消息积累也会让RabbitMQ的性能下降。
多个生产者能将消息发送到一个队列中,多个消费者也能从同一个队列中取消息。
Queue -
消费者(Consuming)和获取消息是一个意思。一个消费者就是一个等待获取消息的程序。通常把它绘制成"C"。
Consuming
Hello World!####
我们需要编写两个程序,一个程序将"Hello World!"发送到队列中(这句话说的不够严谨,生产者是不能直接将消息发送到队列中的),然后另一个程序从队列中取出后并打印消息内容。
RabbitMQ库#####
RabbitMQ使用的是AMQP协议。要使用它就不洗使用一个使用相同协议的库。python中有一个库叫做pika。
安装pika库(Linux)
$ sudo pip install pika
发送消息#####
SendMessagefile_name: send.py
第一个程序send.py会发送一个消息到队列中。首先建立一个到RabbitMQ服务器的连接connection。
# coding:utf-8
import pika
# 创建一个与RabbitMQ服务器的连接
connection=pika.BlockingConnection(pika.ConnectionParam(
'localhost'))
channel = connection.channel()
现在我们已经连接上服务器,在发送消息前我们需要确认队列是存在的,如果我们把消息发送一个不存在的队列,RabbitMQ会丢弃消息,我们先创建一个名字为hello的队列,然后将消息发送到这个队列。
# 创建了一个名字为hello的队列
channel.queue_declare(queue='hello')
已经创建了一个队列,现在可以将第一条消息发送到队列了,我们发送一个字符串。
# 发送消息的函数,excheng参数是交换机;
# routing_key是队列的名字;body是发送消息的内容。
channel.basic_publish(exchange=' ',
routing_key = 'hello',
body = 'Hello World!')
print ' [x] Sent 'Hello World!'
退出程序之前,我们需要确认网络缓冲已经被刷写,消息已经投递到RabbitMQ中,完成这件事情可以这样
connection.close()
获取消息#####
GetMessage我们第二个程序是获取队列中的消息,并打印消息。
在从队列中获取消息时,首先也是先连接到RabbitMQ服务器还要确定要获取消息的队列是已经存在的队列。
# 前面创建连接的代码和上面是一样的
channel.queue_declare(queue='hello')
为什么要重复声明队列呢?刚在发送的时候就已经创建队列了,再次创建会不会是两个队列?之所以创建两次是因为我们的发送和接收程序不知道哪一个先运行,如果接收程序中没有创建队列,先于发送程序运行,就会出现接收消息的队列不存在。
列出所有队列#####
$ sudo rabbitmqctl list_queues
在windows中进入rabbitMQ Server的sbin目录下打开cmd运行rabbitmqctl list_queues
从队列中获取消息需要为队列定义一个回调函数(callback)。当我们获取消息的时候就会调用这个回调函数。这个回掉函数将接收到的消息输出到屏幕上。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body, )
下一步,我们需要告诉RabbitMQ这个回调函数将会从名字为'hello'的队列中接收消息。
# queue参数是队列的名字;no_ack参数暂时不知何用
channel.basic_consume(callback,
queue='hello',
no_ack=True)
最后,我们输入一个用来等待消息数据并且在需要的时候运行回调函数的无限循环。
print ' [*] Waiting for message. To exit press CRTL+C'
channel.start_consuming() # 开始循环
整合#####
send.py
# codin:utf-8
import pika
connection=pika.BlockingConnection(pika.ConnectionParam(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
receiver.py
# coding:utf-8
import pika
connection=pika.BlockingConnection(pika.ConnectionParaer(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
运行#####
running从上面的运行结果看,生产者发送的消息成功被消费者接收。
待续。。。####
参考文章:http://wiki.jikexueyuan.com/project/rabbitmq/hello-world.html
网友评论