美文网首页
RabbitMQ入门

RabbitMQ入门

作者: 转身丶即天涯 | 来源:发表于2019-08-07 16:07 被阅读0次

回顾

上一章,我们学习了在Mac上安装RabbitMQ,Mac安装RabbitMQ
本章,学习一下如何使用RabbitMQ这个消息队列来发送消息。

前言

python 3.6
RabbitMQ 3.7.8
pika 1.1.0
在开始之前,我们需要了解一个设计模式——”生产者消费者“模式。

生产者消费者模式

让我们看一下维基百科中的定义。

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

数据流向

我画了草图,如下:


image.png

生产者将产生的数据发送给RabbitMQ,然后消费者再从RabbitMQ中把数据读取出来,做进一步处理。

当然,实际环境中并不会只有一个生产者和一个消费者,而是会有多个生产者和多个消费者。比如下图:


image.png

这里每个生产者或者消费者都是一个进程。

使用场景

一开始我也费解,原本两个函数就能解决的事情,为什么要引入一个消息队列呢?这岂不是增加了程序的复杂性么?
那是因为我很少接触多进程编程,引入消息队列时为了解决进程间同步问题,也可以说是进程间通信问题。

我们知道一个程序是以进程作为基本单位的,也知道各个进程拥有各自的内存资源,进程之间如果想进行通信,需要借助其他媒介(比如文件,数据库,消息队列)。

创建第一个消息队列

首先,需要保证RabbitMQ已经被启动,可以在命令行中输入rabbitmq-server启动RabbitMQ。

$ rabbitmq-server

如果看到下图,表示启动成功。


image.png

然后需要安装一个python第三方包,方便我们操纵RabbitMQ。

pip install pika

生产者和消费者

我们可以创建两个python脚本,分别用来表示生产者(send.py)和消费者(receive.py)。

生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World1.')

print("[x] sent 'Hello World.'")

connection.close()

首先,导入pika包,然后创建一个BlockingConnection连接,这个连接是连接到本地的RabbitMQ的。
然后创建一个频道(channel),并在这个频道中调用queue_declare()方法声明一个队列,并将队列名hello传入。
接下来,调用channel的basic_publish()方法将数据传递给队列hello。
生产者无法直接给队列hello传递消息,需要借助交换机,也就是这里的exchange参数,使用''(空字符串)表示默认交换机。
routing_key表示队列的名字,body表示数据的内容。
最后,调用connection的close()方法来关闭和RabbitMQ的连接。

到此为止,我们的数据已经发送给本地的RabbitMQ了。接下来我们需要去写一个消费者脚本来获取数据。

消费者
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("[x] received {}".format(body))


channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)

print("[x] waiting for messages. To exit press CTRL+C")
channel.start_consuming()

前面的步骤是一样,都是先建立连接,然后创建频道。
在第5行,我在频道中又一次声明了队列'hello',这是一种容错机制,当我们不知道是生产者脚本先运行还是消费者脚本先运行时,这样是一种推荐做法。
当然,我们现在明确的知道是生产者脚本先运行,而且已经声明了队列hello,所以不写这行也没事。

接下来,定义了一个回调函数callback,它的作用是在消费者从队列中成功读取到数据时调用的。
然后,调用channel.basic_consume()方法来获取数据,on_message_callback参数需要传入一个回调函数(就是刚刚我们写的callback函数),queue参数表示队列的名字,
auto_ack参数如果为True,”自动确认“模式会被开启。具体可以看这里

最后,调用channel.start_consuming()方法一直消费队列中的数据,直到你按下Ctrl + C来终止。

验收结果

为了方便查看,我使用pycharm运行send.py脚本,使用命令行的方式运行receive.py脚本。
先启动send.py。如果没报错,结果会打印如下内容。


image.png

在命令行中再启动receive.py脚本。


image.png

PS:这里能看到两条记录是因为我刚刚按错了,多执行了一次send.py脚本。

然后再执行一次send.py,再回到命令行窗口里你会发现,又多了一条数据。


image.png

到目前为止,我们已经成功的走通我们的一个消息队列。

相关文章

  • MyBatis-从查询昨天的数据说起

    前段时间写了《RabbitMQ入门》系列RabbitMQ入门-初识RabbitMQ RabbitMQ入门-从Hel...

  • 消息服务 - RabbitMQ 基础入门

    rabbitmq RabbitMQ官方入门教程 本文算是实现对入门教程的 java版本翻译吧。本文中演示代码地址 ...

  • RabbitMQ基础

    0. 前言 什么是消息中间件 安装RabbitMQ 编写RabbitMQ的入门程序 RabbitMQ的5种模式特征...

  • RabbitMQ入门-高效的Work模式

    扛不住的Hello World模式 上篇《RabbitMQ入门-从HelloWorld开始》介绍了RabbitMQ...

  • RabbitMQ 入门

    1:windows 下rabbitMQ 的安装 2:介绍 3:RabbitMQ 常见术语 3:java 入门实例 ...

  • RabbitMQ入门-从HelloWorld开始

    从读者的反馈谈RabbitMQ 昨天发完《RabbitMQ入门-初识RabbitMQ》,我陆陆续续收到一些反馈。鉴...

  • RabbitMq相关文章索引(1)

    基本常识 rabbitmq百度百科 RabbitMQ用户角色及权限控制 rabbitMQ入门详解,大神勿喷。。。自...

  • RabbitMQ学习

    MQ入门总结(五)RabbitMQ的原理和使用Spring Boot 中使用 RabbitMQ -- 很棒Cent...

  • RabbitMQ 学习-direct模式

    RabbitMQ direct模式 关于它的快速入门,可以在这里查看:http://www.rabbitmq.co...

  • RabbitMQ入门

    [toc] 一:入门 1.安装Erlang 2.安装RabbitMQ 3.配置 激活 RabbitMQ's Man...

网友评论

      本文标题:RabbitMQ入门

      本文链接:https://www.haomeiwen.com/subject/zswodctx.html