美文网首页
NSQ源码(前言)-nsq介绍

NSQ源码(前言)-nsq介绍

作者: 日月神父 | 来源:发表于2020-02-23 16:32 被阅读0次

NSQ是一个go语言实现的消息队列,每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。

nsq文档

nsq组件

  • nsqd
  • nsqlookupd
  • nsqadmin

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

  • 可以单独作为queue部署
  • 也可以结合nsqlookupd(topic和channel发现)部署,
  • 监听2个端口一个是tcp另外一个是http

nsqadmin

nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.
是一个管理员接口,查看状态等信息

nsqadmin_screenshot.png

nsqlookupd

nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

是管理的拓扑信息,并提供了最终一致发现服务的守护进程

承诺

  1. messages are not durable (by default)
    默认情况下并不持久化消息 --mem-queue-size选项指定最大的queue size,后续分析选项的作用
  2. messages are delivered at least once
    至少投递一次,后面我们会分析如何实现
  3. messages received are un-ordered
    消息无序
  4. consumers eventually find all topic producers
    最终一致性

nsq 消息推送流程

Topic
channel
consumer

  • producer向nsqd发送消息时,指定topic如果topic不存在即创建一个该名称的topic;
config := nsq.NewConfig()
producer, err := nsq.NewProducer("192.168.200.151:4150", config)
if err != nil {
    log.Fatal(err)
}
defer producer.Stop()
msg := "hello world"
messageBody := []byte(msg)
topicName := "test"

// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err := producer.Publish(topicName, messageBody)
if err != nil {
    log.Fatal(err)
}
  • consumer向nsqlookupd注册
func (h *messageHandler) HandleMessage(m *go_nsq.Message) error {
    if len(m.Body) == 0 {
        return nil
    }
    log.Println(string(m.Body), m.ID, m.Attempts, m.NSQDAddress, m.Timestamp)
    return nil
}
config := go_nsq.NewConfig()
consumer, err := go_nsq.NewConsumer("test", "test_channel22", config)
if err != nil {
    log.Println(err)
}
defer consumer.Stop()
consumer.AddConcurrentHandlers(&messageHandler{}, 2)
err = consumer.ConnectToNSQLookupd("192.168.200.151:4161")
//err = consumer.ConnectToNSQD("192.168.200.151:4150")
if err != nil {
    log.Fatal(err)
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1)

<-sigChan

consumer通过nsqlookup的找到对应topic的所有producers,并获取producer所在的nsqd,并向nsqd注册channel和topic;nsq就是通过增加的nsqlookupd来避免SPOF。


架构图

多个consumer注册到同一个channel,一个 producer向它的本地 nsqd发送消息,要做到这点,首先要先打开一个连接( NSQ 提供 HTTP API 和 TCP 客户端 等2种方式连接到 nsqd),然后发送一个包含 topic和消息主体的发布命令(pub/mpub/publish),在这种情况下,我们将消息发布到 topic上,消息会采用多播的方式被拷贝到各个 channel中, 然后通过多个 channel以分散到我们不同需求的 consumer中。


消息推送

从下一节开始我们分析nsq的源码,分析是如何实现高性能推送服务,并分析一些参数的用途。

相关文章

网友评论

      本文标题:NSQ源码(前言)-nsq介绍

      本文链接:https://www.haomeiwen.com/subject/yiujqhtx.html