美文网首页一些收藏
Message Queue简介与应用(Python版)

Message Queue简介与应用(Python版)

作者: 晓柒NLP与药物设计 | 来源:发表于2022-07-06 16:51 被阅读0次

    1.1 MQ简介

    1.1.1 MQ概况

    MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者(Producer)产生消息并把消息放入队列,然后由消费者(Consumer)去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    1.1.2 MQ相较于其他传输方式的优势

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    • 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后发送一条MQ,其余模块消费MQ消息即可实现业务,降低模块之间的耦合。
    • 异步:主业务执行结束后从属业务通过MQ异步执行减低业务的响应时间,提高用户体验。
    • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

    1.1.3 主要的MQ产品

    主要的MQ产品包括:RocketMQ、RabbitMQ、ActiveMQ、ZeroMQ、Kafka、IBM WebSphere 等,现在我司支持RocketMQ和Kafka。

    1.2 MQ环境搭建——(Python Rockmq.client版)

    1.2.1 rockmq.client配置

    • 先下载rockmq.client包
    wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/
    rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
    
    • 安装rpm包
    sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
    

    1.2.2 python相关包配置

    pip install rocketmq-client-python
    pip install rocketmq
    

    以上文件均可在平安云镜像http://mirrors.paic.com.cn/搜索下载

    1.3 MQ配置参数

    • PullConsumer模式
    from rocketmq.client import PullConsumer
    consumer = PullConsumer('CID_XXX')
    consumer.set_namesrv_addr('127.0.0.1:9887')
    consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') 
    consumer.start()
    for msg in consumer.pull('YOUR-TOPIC'):
        print(msg.id, msg.body)
    consumer.shutdown()
    
    • PushConsumer模式
    import time
    from rocketmq.client import PushConsumer
    def callback(msg):
        print(msg.id, msg.body)
    consumer = PushConsumer('CID_XXX')
    consumer.set_namesrv_addr('127.0.0.1:9887')
    consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
    consumer.subscribe('YOUR-TOPIC', callback)
    consumer.start()
    while True:
        time.sleep(3600)
    consumer.shutdown()
    

    参数详解

    • PullConsumer/PushConsumer:一种消费的方式

      • Push方式里,consumer把轮询过程封装了(长轮询机制),并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
      • Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
    • set_namesrv_addr:包含一个参数,用于填写目标Topic的地址与端口。

    • set_session_credentials:包含三个参数,在不使用ALIYUN的情况下为[VIRTUAL_ACCOUNT, VIRTUAL_ACCOUNT,‘’],前两个值为虚拟账号。

    • consumer.pull():包含一个参数,为用于接收消息的TOPIC。

    • msg.id:接收消息的id。

    • msg.body:接收消息的内容。

      另注:在实际生产环境中,多数使用PushConsumer消费者进行构建,因为其消息可以被快速消费,更新的实时性能较好。

    注意,所有IP均需填写,以分号相隔。

    相关文章

      网友评论

        本文标题:Message Queue简介与应用(Python版)

        本文链接:https://www.haomeiwen.com/subject/kjepbrtx.html