NSQ是一个go语言实现的消息队列,每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。
nsq组件
- nsqd
- nsqlookupd
- nsqadmin
nsqd
nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.
- 可以单独作为queue部署
- 也可以结合nsqlookupd(topic和channel发现)部署,
- 监听2个端口一个是tcp另外一个是http
nsqadmin
nsqadmin_screenshot.pngnsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.
是一个管理员接口,查看状态等信息
nsqlookupd
nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.
是管理的拓扑信息,并提供了最终一致发现服务的守护进程
承诺
- messages are not durable (by default)
默认情况下并不持久化消息 --mem-queue-size选项指定最大的queue size,后续分析选项的作用 - messages are delivered at least once
至少投递一次,后面我们会分析如何实现 - messages received are un-ordered
消息无序 - consumers eventually find all topic producers
最终一致性
nsq 消息推送流程
Topic
channel
consumer
- producer向nsqd发送消息时,指定topic如果topic不存在即创建一个该名称的topic;
config := nsq.NewConfig()
producer, err := nsq.NewProducer("192.168.200.151:4150", config)
if err != nil {
log.Fatal(err)
}
defer producer.Stop()
msg := "hello world"
messageBody := []byte(msg)
topicName := "test"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err := producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
- consumer向nsqlookupd注册
func (h *messageHandler) HandleMessage(m *go_nsq.Message) error {
if len(m.Body) == 0 {
return nil
}
log.Println(string(m.Body), m.ID, m.Attempts, m.NSQDAddress, m.Timestamp)
return nil
}
config := go_nsq.NewConfig()
consumer, err := go_nsq.NewConsumer("test", "test_channel22", config)
if err != nil {
log.Println(err)
}
defer consumer.Stop()
consumer.AddConcurrentHandlers(&messageHandler{}, 2)
err = consumer.ConnectToNSQLookupd("192.168.200.151:4161")
//err = consumer.ConnectToNSQD("192.168.200.151:4150")
if err != nil {
log.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1)
<-sigChan
consumer通过nsqlookup的找到对应topic的所有producers,并获取producer所在的nsqd,并向nsqd注册channel和topic;nsq就是通过增加的nsqlookupd来避免SPOF。
架构图
多个consumer注册到同一个channel,一个 producer向它的本地 nsqd发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic上,消息会采用多播的方式被拷贝到各个 channel中, 然后通过多个 channel以分散到我们不同需求的 consumer中。
消息推送
从下一节开始我们分析nsq的源码,分析是如何实现高性能推送服务,并分析一些参数的用途。
网友评论