消息中间件本质上就是一种很简单的数据结构——队列,但是光队列肯定是构不成中间件的,必须要考虑性能、容灾、可靠性等等因素。
在理解什么是nsq之前,先来考虑一下,为什么要使用消息中间件
为什么使用消息中间件
简单来说,使用消息中间件就是为了解决分布式系统之间消息的传递。
假如用户在网站内注册了一个账号,需要给其发短信和邮件,至少要调用2个其他服务的接口。
register (...) {
doRegister(...);
//调用其他服务接口
sendMsg(...);
sendEmail(...);
}
这样的做法,显然不合理。
1、过度耦合:后面如果注册之后要处罚其他的动作(比如要在APP内也发一条消息),那就得去改代码,在原有的注册账号的函数末尾,在追加代码。
2、缺少缓冲:如果注册账号时,发送消息的系统恰好处于非常忙碌或者宕机的状态,那这时发送消息就会失败,我们需要一个地方,来暂时存放无法被消费的消息
3、执行顺序:每次用户注册完之后先发送短信,再发送邮件,这样的做法太low。
我们可以使用一个消息中间件,来解决上面的问题。
我们往注册中心和其他系统之间引入了一个消息中间件,或者可以先简单点,理解为引入一个队列。
当账号注册完成后,它只需要往队列中塞入(push)一条topic为“register”的消息。接着,我们的消息中间件(队列)会把这条消息推送给所有订阅了这个topic的消息的机器,告诉他们,“创建了一个新的用户,你们做自己该做的去吧”。
这样一个简单的队列,就做到了:
1、系统解耦:如果后面有新的动作,需要在注册账号后执行,那么只需要让新的动作自己去订阅topic为“register”的消息即可
2、缓冲:如果消息发送系统现在很忙,没空处理消息,那么只需跟消息中间件说,“我很忙,不要再发消息过来了”,那么消息中间件就不会给它推送消息,或者消息发送系统出了故障,消息虽然推送过去了,但是它给处理失败了,那么也只需给消息中间件回复一个“requeue”的命令,消息中间件就会把消息重新放入队列,进行重试。
3、并行执行消息发送:邮件发送系统不需要等到短信发送完之后再开始发送邮件了,他只要收到消息,就可以执行自己的操作。
认识nsq
上面使用了一个简单的队列来充当消息中间件,在分布式系统中,这显然是不可靠的。
首先,假设我的短信发送系统,部署了三台实例,他们都订阅了topic为“register”的消息,那么一旦有账号创建,这三台实例就都会收到消息,并且去发送短信,而其实我只需要发送一次就ok了。
对于这样的问题,在nsq里涉及到了一个channel的概念。
短信发送系统的三个实例,当它们收到消息时,要做的事情是一样的,并且只需要有有一个实例执行,那么它们就是一个消费者组里面的,要标识为同一个channel,比如说叫“send_msg”的channel,而邮件发送系统,也要有自己的channel,用来和短信发送系统作区分,比如说叫“send_email”。
当nsq收到消息时,会给每个channel复制一份消息,然后channel再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有ABC三个实例,这次推给了A,那么下次有可能是推送给B,再下次,也许就是C …
nsq官网上的一张动图,就是在解释这个过程:
nsq-topic-channel-consumer图中,nsq上有一个叫”clicks“的topic,”clicks“下面有三条channel,其中channel名称为”metrics“的,有三个实例。消息A来到nsq后,被复制到三条channel,接着,在metrics上的那个A,被推送到了第二个实例上。接着,又来了一个叫B的消息,这一次,B被推送给了第一个实例进行处理。
nsqlookup
我们已经知道,nsq收到生产者生产的消息后,需要将消息复制多份,然后推送给对应topic和channel的消费者。
那么,nsq怎么知道哪些消费者订阅了某个topic的消息呢?
我们需要一个类似于微服务里头的注册中心的模块,来实现服务发现的功能,这就是nsqlookup.
nsqlookup提供了类似于etcd、zookeeper一样的kv存储服务,里面记录了topic下面都有哪些nsq。
nsqlookup提供了一个/lookup接口,比如你想知道哪些nsq上面,有topic为order_created的消息,那么只需要调一下:
curl 'http://127.0.0.1:4161/lookup?topic=order_created'
nsqlookup就会给你返回对应topic的nsq列表:
{"channels":["send_msg"],"producers":[{"remote_address":"127.0.0.1:64402","hostname":"shuruideMacBook-Pro.local","broadcast_address":"127.0.0.1","tcp_port":4150,"http_port":4151,"version":"1.1.0"}]}
接着消费者只需要遍历返回的json串里的producers列表,把broadcast_address和tcp_port或者http_port拼起来,就可以拿到要建立连接的url地址。
消费者会和这些nsq,逐个建立连接。nsq收到对应topic的消息后,就会给和他们建立连接的消费者,推送消息。
nsq的Java客户端里,就有这样的逻辑,里面是遍历了nsqlookup的列表,然后把所有lookup的返回结构,进行合并。
com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup
接着和旧的nsq列表比较,进行删除和新增,保证本地的nsq列表数据是最新的。
com.github.brainlag.nsq.NSQConsumer#connect
这个过程会定期去执行,不断去获取最新的nsq列表。
nsq集群
nsq的集群部署非常简单,官方推荐一个生产者对应的部署一个nsqd:
基本的用法是每个nsqd作为一个producer的消息队列,producer不停地向一个nsqd推送message,然后由consumer通过lookupd连接到相应的nsqd获取message。由NSQ文档可知,producer打到某个nsqd上的message会存在nsqd所在host的内存或磁盘中,而不会扩散到其他的nsqd机器上,这样若这个nsqd所在主机宕机,则所有在这个Host上的消息就会丢失。
总结
消息中间件的应用场景
异步处理
如上面提到的用户注册的例子:
用户注册(50ms),发送邮件(50ms)和短信(50ms)
串行:(150ms)用户注册==>发送邮件==>发送短信
并行(100ms):用户注册==>发送邮件
|==>发送短信
消息中间件(56ms):用户注册(50ms) ==>(6ms)消息中间件==>发送邮件
|==>发送短信
应用解耦
对一些实时性要求不高的跨系统调用,可以考虑用消息中间件进行应用解耦
流量的削峰
比如,系统举行秒杀活动,热门商品流量蜂拥而至 。100件商品,10万人挤进来怎么办,10万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的10万个请求中的100个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.
消息通信
可以用来做一些数据一致性对比等操作。
nsq的几个重要组件
nsqlookupd
主要功能是服务发现。每个nsqd启动时都会向配置中配置的lookupd发起register请求,lookupd维护着各各节点的topic+channel的meta信息
nsqd
nsq的核心,负责消息的存储与分发。包括topic和channel的管理、producer和consumer的维护,简单的说,真正干活的就是这个服务.
nsqadmin
提供一套WEB UI,用来汇集集群的实时统计,提供比较全的集群管理功能和各节点的状态信息.(这里没有提到,在后面的实际操作可以看到)
网友评论