1.1 MQ简介
1.1.1 MQ概况
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者(Producer)产生消息并把消息放入队列,然后由消费者(Consumer)去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
1.1.2 MQ相较于其他传输方式的优势
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
- 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后发送一条MQ,其余模块消费MQ消息即可实现业务,降低模块之间的耦合。
- 异步:主业务执行结束后从属业务通过MQ异步执行减低业务的响应时间,提高用户体验。
- 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
1.1.3 主要的MQ产品
主要的MQ产品包括:RocketMQ、RabbitMQ、ActiveMQ、ZeroMQ、Kafka、IBM WebSphere 等,现在我司支持RocketMQ和Kafka。
1.2 MQ环境搭建——(Python Rockmq.client版)
1.2.1 rockmq.client配置
- 先下载rockmq.client包
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/
rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
- 安装rpm包
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
1.2.2 python相关包配置
pip install rocketmq-client-python
pip install rocketmq
以上文件均可在平安云镜像http://mirrors.paic.com.cn/搜索下载
1.3 MQ配置参数
- PullConsumer模式
from rocketmq.client import PullConsumer
consumer = PullConsumer('CID_XXX')
consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')
consumer.start()
for msg in consumer.pull('YOUR-TOPIC'):
print(msg.id, msg.body)
consumer.shutdown()
- PushConsumer模式
import time
from rocketmq.client import PushConsumer
def callback(msg):
print(msg.id, msg.body)
consumer = PushConsumer('CID_XXX')
consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
参数详解
-
PullConsumer/PushConsumer:一种消费的方式
- Push方式里,consumer把轮询过程封装了(长轮询机制),并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
- Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
-
set_namesrv_addr:包含一个参数,用于填写目标Topic的地址与端口。
-
set_session_credentials:包含三个参数,在不使用ALIYUN的情况下为[VIRTUAL_ACCOUNT, VIRTUAL_ACCOUNT,‘’],前两个值为虚拟账号。
-
consumer.pull():包含一个参数,为用于接收消息的TOPIC。
-
msg.id:接收消息的id。
-
msg.body:接收消息的内容。
另注:在实际生产环境中,多数使用PushConsumer消费者进行构建,因为其消息可以被快速消费,更新的实时性能较好。
注意,所有IP均需填写,以分号相隔。
网友评论