golang-nsq系列(一)--初识

作者: 热爱coding的稻草 | 来源:发表于2019-08-18 21:04 被阅读2次

    nsq 最初是由 bitly 公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。

    nsq

    它具有以下特性:

    分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。

    易于扩展。它支持水平扩展,没有中心化的消息代理(Broker),内置的发现服务让集群中增加节点非常容易。

    运维方便。它非常容易配置和部署,灵活性高。

    高度集成。现在已经有官方的GolangPythonJavaScript客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

    1. 首先到官方文档看用法:

    https://nsq.io/overview/quick_start.html

    quick_start

    下载对应的二进制可执行文件,在本地按照上述步骤就可以跑起来了,看下nsqadmin 后台展示如下:

    nsqadmin

    2. docker 环境搭建 nsq

    参考官方提供资料创建:docker-compose.yml

    version: '2' # 高版本支持3
    services:
      nsqlookupd:
        image: nsqio/nsq
        command: /nsqlookupd
        ports:
          - "4160:4160" # tcp
          - "4161:4161" # http
    
      nsqd:
        image: nsqio/nsq
        # 广播地址不填的话默认就是oshostname(或虚拟机名称),那样 lookupd 会连接不上,所以直接写IP
        command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160
        depends_on:
          - nsqlookupd
        ports:
          - "4150:4150" # tcp
          - "4151:4151" # http
    
      nsqadmin:
        image: nsqio/nsq
        command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
        depends_on:
          - nsqlookupd
        ports:
          - "4171:4171" # http
    

    执行 docker-compose up -d 生成对应的三个容器:

    nsqgo_nsqd_1

    nsqgo_nsqlookupd_1

    nsqgo_nsqadmin_1

    3. Golang 使用 nsq

    创建生产者:producer.go

    package main
    
    import (
      "fmt"
      "log"
      "time"
    
      "github.com/nsqio/go-nsq"
    )
    
    func main() {
      config := nsq.NewConfig()
      p, err := nsq.NewProducer("127.0.0.1:4150", config)
    
      if err != nil {
        log.Panic(err)
      }
    
      for i := 0; i < 1000; i++ {
        msg := fmt.Sprintf("num-%d", i)
        log.Println("Pub:" + msg)
        err = p.Publish("testTopic", []byte(msg))
        if err != nil {
          log.Panic(err)
        }
        time.Sleep(time.Second * 1)
      }
    
      p.Stop()
    }
    

    循环写 1000 个 num-1--1000,通过 p.Publish 发送到消息队列中,等待消费。

    创建消费者:consumer.go

    package main
    
    import (
      "log"
      "sync"
    
      "github.com/nsqio/go-nsq"
    )
    
    func main() {
      wg := &sync.WaitGroup{}
      wg.Add(1000)
    
      config := nsq.NewConfig()
      c, _ := nsq.NewConsumer("testTopic", "ch", config)
      c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Printf("Got a message: %s", message.Body)
        wg.Done()
        return nil
      }))
    
      // 1.直连nsqd
      // err := c.ConnectToNSQD("127.0.0.1:4150")
    
      // 2.通过 nsqlookupd 服务发现
      err := c.ConnectToNSQLookupd("127.0.0.1:4161")
      if err != nil {
        log.Panic(err)
      }
      wg.Wait()
    }
    

    可通过两种方式与 nsqd 连接:

    1). 直连 nsqd,适用于单机(standalone)版;

    2). 通过 nsqlookupd 服务发现,适用于集群(cluster)版;

    输出结果:

    go run producer.go

    2019/08/18 20:29:51 Pub:num-0
    2019/08/18 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
    2019/08/18 20:29:52 Pub:num-1
    2019/08/18 20:29:53 Pub:num-2
    2019/08/18 20:29:54 Pub:num-3
    2019/08/18 20:29:55 Pub:num-4
    2019/08/18 20:29:56 Pub:num-5
    2019/08/18 20:29:57 Pub:num-6
    2019/08/18 20:29:58 Pub:num-7
    2019/08/18 20:29:59 Pub:num-8
    2019/08/18 20:30:00 Pub:num-9
    2019/08/18 20:30:01 Pub:num-10
    

    go run consumer.go

    2019/08/18 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
    2019/08/18 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
    2019/08/18 20:30:08 Got a message: num-0
    2019/08/18 20:30:08 Got a message: num-1
    2019/08/18 20:30:08 Got a message: num-2
    2019/08/18 20:30:08 Got a message: num-3
    2019/08/18 20:30:08 Got a message: num-4
    2019/08/18 20:30:08 Got a message: num-5
    2019/08/18 20:30:08 Got a message: num-6
    2019/08/18 20:30:08 Got a message: num-7
    2019/08/18 20:30:08 Got a message: num-8
    2019/08/18 20:30:08 Got a message: num-9
    2019/08/18 20:30:08 Got a message: num-10
    

    github 源码下载:

    https://github.com/astraw99/nsq_go

    【小结】

    a. 单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡;

    b. nsq 专门为分布式、集群化而生,在处理 SPOF(single point of failure, 单点故障)、高可用、最终一致性方面很有优势。

    稻草人生

    相关文章

      网友评论

        本文标题:golang-nsq系列(一)--初识

        本文链接:https://www.haomeiwen.com/subject/goudsctx.html