美文网首页小白入门微服务
小白入门微服务(2) - 消息队列初体验

小白入门微服务(2) - 消息队列初体验

作者: zone7_ | 来源:发表于2018-09-13 09:42 被阅读98次

    概述

    • 前言
    • 消息队列使用场景
    • 什么是消息队列
    • 常用消息队列库对比
    • Kafka 初体验
    • RabbitMQ 初体验
    • 后记

    前言

    前面两篇我们学习了

    接下来我们来学习微服务中的异步通信 - 消息队列。在这篇文章的学习中,默认你已经掌握了 Docker、docker-compose 的知识。如果你还没有掌握,可以翻阅我的历史文章。如果我的文章对你有帮助,欢迎关注、点赞、转发,这样我会更有动力做原创分享。

    消息队列的使用场景

    异步处理

    场景描述:在正常的用户注册流程中,用户注册完成都需要,发送邮件与短信,其中传统情况下,有串行与并行两种方式。

    串行

    注册成功之后,写入数据库,写完之后,再发邮件,最后再发短信。总体耗时:

    50 + 50 + 50 = 150 ms

    并行

    注册成功之后,写入数据库,并行发邮件和发短信。总体耗时:

    50 + 50 = 100 ms

    消息队列+异步

    注册成功之后,写入数据库,将写入成功的信息,发送至消息队列,由于消费者自己去消息队列中取消息,这样在写入消息之后,即可成功返回。总体耗时:

    50 + 5 = 55 ms

    三个数据一对比,谁优谁劣就很明显了

    应用解耦

    场景说明:用户下单后,需要减去库存系统中相应数量的库存。

    传统调用方式

    在传统模式下(上图),如果库存系统出现错误,则订单创建失败,而且两者过度耦合,对后面的新需求与维护也是相当大的挑战。在原有的架构上进行升级,则有下图:

    消息队列调用方式
    • 订单系统:用户下单后,订单系统进行数据持久化处理,然后将消息写入消息队列,返回订单创建成功
    • 库存系统:订阅消息,获取下单信息,库存系统根据订单信息,进行库存操作。

    当下,两者就解耦了,库存系统出现错误,也可以正常进行下单了。因为只是入门文章,应用场景就先介绍到这里。更多使用场景请自行百度、Google。

    什么是消息队列

    消息队列

    如图,P 为 producer 生产者,C 为 consumer 消费者,红色部分为消息队列。通俗地解释一下消息队列,你想象一个场景:你到报社订阅了一份报纸,报社每日生产一份新报纸,便将新报纸发往邮局并告诉邮局你的地址,邮递员将你的报纸送往你的邮箱,你便可以愉快地阅读今天的时事新闻了。当然,可能一个人订阅了好几家报社,一家报社也可以被多个人订阅。在这个场景中,消息队列就担任了,邮箱、邮局、邮递员的角色。

    常用消息队列库对比

    在网络上搜索到一个比较全面的对比图,我这边就直接引用原图,不再造轮子了。原图地址:(https://cloud.tencent.com/developer/article/1006035

    消息队列库对比
    广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka。

    Kafka 初体验

    kafka 术语:

    • Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
    • Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。
    • Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。
    • Producer:负责发布消息到 Kafka broker。
    • Consumer:消息消费者,向 Kafka broker 读取消息的客户端。
    • Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

    安装单节点 kafka

    由于这里只是入门,就只使用单节点了。这里使用 Docker 来构建 kafka,其代码如下:docker-compose.yaml

    version: '3'
    services:
    #<!--定义zk层服务-->
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
    #<!--定义Kafka层-->
      kafka:
        image: wurstmeister/kafka
        depends_on: [ zookeeper ]
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 123.45.567.89
          KAFKA_CREATE_TOPICS: "test:1:1"
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    docker-compose up 启动 kafaka。

    nodejs 与 python 互调

    我们先来看看,nodejs 为 producer ,python 为 comsumer 的情况:


    python consumer nodejs producer

    再看看,python 为 producer,nodejs 为consumer 的情况:


    nodejs consumer python producer

    代码展示

    nodejs producer

    var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        KeyedMessage = kafka.KeyedMessage,
        client = new kafka.KafkaClient({kafkaHost: '123.45.567.89:9092'}),
        producer = new Producer(client),
        payloads = [
            { topic: 'my_favorite_topic', messages: 'produce by nodejs', partition: 0 }
        ];
    producer.on('ready', function () {
        producer.send(payloads, function (err, data) {
            console.log(data);
        });
    });
    producer.on('error', function (err) {})
    

    nodejs consumer

    var kafka = require('kafka-node'),
        Consumer = kafka.Consumer,
        client = new kafka.KafkaClient({kafkaHost: '123.45.567.89:9092'}),
        consumer = new Consumer(
            client,
            [
                {topic: 'my_favorite_topic', partition: 0}
            ],
            {
                autoCommit: false
            }
        );
    consumer.on('message', function (message) {
        console.log(message);
    });
    

    python producer

    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers=['123.45.567.89:9092'],
                             api_version=(0, 10, 1)
                             )  # 此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
    for _ in range(100):
        producer.send('my_favorite_topic', b'produce by python')
    producer.close()
    

    python consumer

    from kafka import KafkaConsumer
    consumer = KafkaConsumer('my_favorite_topic', bootstrap_servers=['123.45.567.89:9092'], api_version=(0, 10, 1)
    for msg in consumer:
        print(msg)
    

    RabbitMq 初体验

    介绍过 Kafka 之后,RabbitMQ 就不在详细介绍了,请看源码了。

    后记

    不知不觉,消息队列也讲完了,要真真切切地区实践,才能真正地掌握。
    个人的知识储备总是有限的,如有错误的地方,还请大佬斧正。点击阅读原文,链接到我的知乎,我会在知乎上对文章错误的地方进行修改。

    本篇文章首发于公众号「zone7」,关注公众号获取最新推文,后台回复【小白微服务】获取源码。

    相关文章

      网友评论

        本文标题:小白入门微服务(2) - 消息队列初体验

        本文链接:https://www.haomeiwen.com/subject/tplvgftx.html