nsq初识

作者: 灯火阑珊唯念沵_e0b8 | 来源:发表于2019-01-14 14:42 被阅读15次

    消息中间件本质上就是一种很简单的数据结构——队列,但是光队列肯定是构不成中间件的,必须要考虑性能、容灾、可靠性等等因素。

    在理解什么是nsq之前,先来考虑一下,为什么要使用消息中间件

    为什么使用消息中间件

    简单来说,使用消息中间件就是为了解决分布式系统之间消息的传递。
    假如用户在网站内注册了一个账号,需要给其发短信和邮件,至少要调用2个其他服务的接口。

    register (...) {
      doRegister(...);
    
      //调用其他服务接口
      sendMsg(...);
      sendEmail(...);
    }
    

    这样的做法,显然不合理。
    1、过度耦合:后面如果注册之后要处罚其他的动作(比如要在APP内也发一条消息),那就得去改代码,在原有的注册账号的函数末尾,在追加代码。
    2、缺少缓冲:如果注册账号时,发送消息的系统恰好处于非常忙碌或者宕机的状态,那这时发送消息就会失败,我们需要一个地方,来暂时存放无法被消费的消息
    3、执行顺序:每次用户注册完之后先发送短信,再发送邮件,这样的做法太low。

    我们可以使用一个消息中间件,来解决上面的问题。

    我们往注册中心和其他系统之间引入了一个消息中间件,或者可以先简单点,理解为引入一个队列。
    当账号注册完成后,它只需要往队列中塞入(push)一条topic为“register”的消息。接着,我们的消息中间件(队列)会把这条消息推送给所有订阅了这个topic的消息的机器,告诉他们,“创建了一个新的用户,你们做自己该做的去吧”。

    这样一个简单的队列,就做到了:
    1、系统解耦:如果后面有新的动作,需要在注册账号后执行,那么只需要让新的动作自己去订阅topic为“register”的消息即可
    2、缓冲:如果消息发送系统现在很忙,没空处理消息,那么只需跟消息中间件说,“我很忙,不要再发消息过来了”,那么消息中间件就不会给它推送消息,或者消息发送系统出了故障,消息虽然推送过去了,但是它给处理失败了,那么也只需给消息中间件回复一个“requeue”的命令,消息中间件就会把消息重新放入队列,进行重试。
    3、并行执行消息发送:邮件发送系统不需要等到短信发送完之后再开始发送邮件了,他只要收到消息,就可以执行自己的操作。

    认识nsq

    上面使用了一个简单的队列来充当消息中间件,在分布式系统中,这显然是不可靠的。

    首先,假设我的短信发送系统,部署了三台实例,他们都订阅了topic为“register”的消息,那么一旦有账号创建,这三台实例就都会收到消息,并且去发送短信,而其实我只需要发送一次就ok了。

    对于这样的问题,在nsq里涉及到了一个channel的概念。
    短信发送系统的三个实例,当它们收到消息时,要做的事情是一样的,并且只需要有有一个实例执行,那么它们就是一个消费者组里面的,要标识为同一个channel,比如说叫“send_msg”的channel,而邮件发送系统,也要有自己的channel,用来和短信发送系统作区分,比如说叫“send_email”。

    当nsq收到消息时,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有ABC三个实例,这次推给了A,那么下次有可能是推送给B,再下次,也许就是C …

    nsq官网上的一张动图,就是在解释这个过程:

    nsq-topic-channel-consumer

    图中,nsq上有一个叫”clicks“的topic,”clicks“下面有三条channel,其中channel名称为”metrics“的,有三个实例。消息A来到nsq后,被复制到三条channel,接着,在metrics上的那个A,被推送到了第二个实例上。接着,又来了一个叫B的消息,这一次,B被推送给了第一个实例进行处理。

    nsqlookup

    我们已经知道,nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。

    那么,nsq怎么知道哪些消费者订阅了某个topic的消息呢?

    我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nsqlookup.

    nsqlookup提供了类似于etcd、zookeeper一样的kv存储服务,里面记录了topic下面都有哪些nsq。
    nsqlookup提供了一个/lookup接口,比如你想知道哪些nsq上面,有topic为order_created的消息,那么只需要调一下:

    curl 'http://127.0.0.1:4161/lookup?topic=order_created'
    

    nsqlookup就会给你返回对应topic的nsq列表:

    {"channels":["send_msg"],"producers":[{"remote_address":"127.0.0.1:64402","hostname":"shuruideMacBook-Pro.local","broadcast_address":"127.0.0.1","tcp_port":4150,"http_port":4151,"version":"1.1.0"}]}
    

    接着消费者只需要遍历返回的json串里的producers列表,把broadcast_address和tcp_port或者http_port拼起来,就可以拿到要建立连接的url地址。
    消费者会和这些nsq,逐个建立连接。nsq收到对应topic的消息后,就会给和他们建立连接的消费者,推送消息。

    nsq的Java客户端里,就有这样的逻辑,里面是遍历了nsqlookup的列表,然后把所有lookup的返回结构,进行合并。
    com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup


    接着和旧的nsq列表比较,进行删除和新增,保证本地的nsq列表数据是最新的。
    com.github.brainlag.nsq.NSQConsumer#connect


    这个过程会定期去执行,不断去获取最新的nsq列表。

    nsq集群

    nsq的集群部署非常简单,官方推荐一个生产者对应的部署一个nsqd:
    基本的用法是每个nsqd作为一个producer的消息队列,producer不停地向一个nsqd推送message,然后由consumer通过lookupd连接到相应的nsqd获取message。由NSQ文档可知,producer打到某个nsqd上的message会存在nsqd所在host的内存或磁盘中,而不会扩散到其他的nsqd机器上,这样若这个nsqd所在主机宕机,则所有在这个Host上的消息就会丢失。

    nsqd

    总结

    消息中间件的应用场景

    异步处理
    如上面提到的用户注册的例子:
    用户注册(50ms),发送邮件(50ms)和短信(50ms)

    串行:(150ms)用户注册==>发送邮件==>发送短信
    
    并行(100ms):用户注册==>发送邮件
    
                       |==>发送短信
    
    消息中间件(56ms):用户注册(50ms) ==>(6ms)消息中间件==>发送邮件
                                                    |==>发送短信
    

    应用解耦
    对一些实时性要求不高的跨系统调用,可以考虑用消息中间件进行应用解耦
    流量的削峰
    比如,系统举行秒杀活动,热门商品流量蜂拥而至 。100件商品,10万人挤进来怎么办,10万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的10万个请求中的100个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.
    消息通信
    可以用来做一些数据一致性对比等操作。

    nsq的几个重要组件

    nsqlookupd
    主要功能是服务发现。每个nsqd启动时都会向配置中配置的lookupd发起register请求,lookupd维护着各各节点的topic+channel的meta信息
    nsqd
    nsq的核心,负责消息的存储与分发。包括topic和channel的管理、producer和consumer的维护,简单的说,真正干活的就是这个服务.
    nsqadmin
    提供一套WEB UI,用来汇集集群的实时统计,提供比较全的集群管理功能和各节点的状态信息.(这里没有提到,在后面的实际操作可以看到)

    相关文章

      网友评论

          本文标题:nsq初识

          本文链接:https://www.haomeiwen.com/subject/uovxdqtx.html