美文网首页
nsqd && Node.js

nsqd && Node.js

作者: 暮归倾城 | 来源:发表于2021-03-13 18:20 被阅读0次

    公司的node项目,需要brew install nsq,然后每次运行终端都要单独开一个tab执行nsqd,nsqd是什么?

    1、NSQ是什么

    • 基于Go语言的分布式消息队列,是一个消息中间件。类似的中间件还有RabbitMQ、阿里开发的RocketMQ、Kafka等。
    • 特点:轻量,入手简单,功能相对少,大部分情况下,无论是性能还是功能基本够用。

    为什么要使用消息中间件

    摘自:MQ(1)—— 从队列到消息中间件

    假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情:

    • 消息通知系统:通知商家,你有一笔新的订单,请及时发货
    • 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
    • 会员系统:更新用户的积分和等级信息

    • 于是一个创建订单的函数,至少需要取调用三个其他服务的接口

    这种实现方式的问题是:

    1. 过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,再追加一行代码
    2. 缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这时更新会员信息就会失败,我们需要一个地方,来暂时存放无法被消费的消息

    队列

    我们在订单系统和其他系统的中间,引入了一个消息中间件,或者说,引入了一条队列。

    当订单系统创建完订单时,它只需要往队列里,塞入(push)一条topic为“order_created”的消息。
    接着,我们的nsq1.0,会把这条消息,再推送给所有订阅了这个topic的消息的机器,告诉他们,“有新的订单,你们该干嘛干嘛”。

    这样一个简单的队列,就解决了上面的两个问题:

    解耦:如果后面有新的动作,需要在创建订单后执行,那么只需要让新同学自己去订阅topic为“order_created”的消息即可

    缓冲:如果会员系统现在很忙,没空处理消息,那么只需跟nsq说,“我很忙,不要再发消息过来了”,那么nsq就不会给它推送消息,或者会员系统出了故障,消息虽然推送过去了,但是它给处理失败了,那么也只需给nsq回复一个“requeue”的命令,nsq就会把消息重新放入队列,进行重试。

    channel

    作为一个靠谱的中间件,你必须做到:高效、可靠、方便。

    假设我的会员系统,部署了三台实例,他们都订阅了topic为“order_created”的消息,那么一旦有订单创建,这三台实例就都会收到消息,并且去更新会员积分信息,而其实我只需要更新一次就ok了。

    这就涉及到一个消费者组(Comsumer Group)的概念。消费者组是Kafka里提到的,在Nsq,对应的术语是channel。

    会员系统的三个实例,当它们收到消息时,要做的事情是一样的,并且只需要有有一个实例执行,那么它们就是一个消费者组里面的,要标识为同一个channel,比如说叫“update_memeber_credit”的channel,而短信系统和推荐系统,也要有自己的channel,用来和会员系统作区分,比如说叫“send_msg”和“update_user_interesting_goods”

    当nsq收到消息时,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有ABC三个实例,这次推给了A,那么下次有可能是推送给B,再下次,也许就是C …

    看下图,topic是消息,channel是消费者组,consumer是消费组里的单个示例

    v2-e7c2d63d53f9486acf031c307df86e7f_b.gif

    2、NSQD是什么

    NSQ 是由四个重要组件构成:

    nsqd :一个负责接收、排队、转发消息到客户端的守护进程
    nsqlookupd :管理拓扑信息并提供最终一致性的发现服务的守护进程
    nsqadmin :一套 Web 用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
    utilities :常见基础功能、数据流处理工具,如 nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

    作为一个靠谱的中间件,你必须支持集群部署,这样才能实现可靠、高效。

    nsq的集群部署非常简单,官方推荐一个生产者对应的部署一个nsqd。

    这样的做法有不少坏处,如果生产者对应的nsq挂掉了,那它就生产不了消息了。而且每个生产者都要部署一个nsq,未免有些奢侈。

    不过对于大多数业务来说,这样的nsq已经够用。

    3、使用

    官方和第三方还为 NSQ 开发了众多客户端功能库,如官方提供的基于 HTTP 的 nsqd 、Go 客户端 go-nsq 、Python 客户端 pynsq 、基于 Node.js 的 JavaScript 客户端 nsqjs 、异步 C 客户端 libnsq 、Java 客户端 nsq-java 以及基于各种语言的众多第三方客户端功能库。

    简单看一下Node.js中nsqjs的使用:

    发送:
    const nsq = require('nsqjs');
    
    let w = new nsq.Writer(
            '127.0.0.1', //address,
            4150, //port, 
          );
    w.connect();
    w.on('ready', function() {
     
    });
    
    const topic = 'test_topic';
    const msg = {
     path: '/test/path',
     query: {
          paramsA: 'a',
          paramsB: 'b',
     },
    w.publish(topic, msg, function(err) {
      
    });
    
    
    接收:
    const nsq = require('nsqjs');
    
    const topic = 'test_topic';
    const channel = 'main';
    const options = {nsqdTCPAddresses: ['127.0.0.1:4150']};
    
    const reader = new nsq.Reader(topic, channel, options);
    reader.connect();
    reader.on('message', function(msg) {
        try {
          var data = msg.json();
          debug(data);
        } catch (err) {
          console.log(err);
        }
    }
    
    

    4、参考文章

    MQ(1)—— 从队列到消息中间件

    NSQ 简介

    NSQ:分布式的实时消息平台

    相关文章

      网友评论

          本文标题:nsqd && Node.js

          本文链接:https://www.haomeiwen.com/subject/ikgxcltx.html