美文网首页
nsq消息队列部署以及使用

nsq消息队列部署以及使用

作者: 零一间 | 来源:发表于2019-08-20 16:07 被阅读0次

    NSQ是一个实时的分布式消息平台。它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。

    NSQ是由3个进程组成的:

    • nsqd 是一个接收、排队、然后转发消息到客户端的进程。
    • nsqlookupd管理拓扑信息并提供最终一致性的发现服务。
    • nsqadmin用于实时查看集群的统计数据(并且执行各种各样的管理任务)。

    1 源码部署

    软件下载直接去官网:https://nsq.io/deployment/installing.html

    cd /usr/local/nsq-1.1.0.linux-amd64.go1.10.3/bin/
    nohup ./nsqlookupd > /dev/null 2>&1 &
    nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
    nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
    

    访问 nsqadmin

    2 docker部署

    获取镜像

    docker pull nsqio/nsq
    

    启动容器

    • 运行lookupd
    ~docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
    450cbab82b8eb491d42bf105185c1022010b4d05e65a04f6c52ba15e1f5af06f
    
    • 获取docker host的IP地址
    ~ docker inspect -f '{{ .NetworkSettings.IPAddress }}' lookupd
    172.17.0.2
    
    • 运行nsqd
    # --broadcast-address=广播到虚拟机地址
    ~ docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.1 --lookupd-tcp-address=172.17.0.2:4160
    3bc0901c8c485c351cfe31b0ef1a4fa32bf6bf148f0d74907afec6cbb1e4a034
    
    • 运行nsqadmin
    ~ docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --lookupd-http-address=172.17.0.2:4161
    1d4cb219b862613d42bbc0f0bd7d08146f48a32d4e68abae2073cf28ed765bb0
    

    注意:宿主机防火墙是否有拦截

    • 查看docker容器是否正常启动运行
    ~ docker ps -a 
    CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
    1d4cb219b862        nsqio/nsq           "/nsqadmin --lookupd…"   3 minutes ago       Up 3 minutes        4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp   nsqadmin
    3bc0901c8c48        nsqio/nsq           "/nsqd --broadcast-a…"   3 minutes ago       Up 3 minutes        4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp   nsqd
    450cbab82b8e        nsqio/nsq           "/nsqlookupd"            4 minutes ago       Up 4 minutes        4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp   lookupd
    
    • 访问nsqadmin

    3 docker-compose部署

    创建docker-compose.yml

    
    ➜  nsq pwd
    /root/nsq
    ➜  nsq vim docker-compose.yml
    

    文件内容如下

    version: '2'
    services:
    
      nsqlookupd:
        image: nsqio/nsq
        command: /nsqlookupd
        networks:
          - nsq-network
        hostname: nsqlookupd
        ports:
          - "4161:4161"
          - "4160:4160"
          
      nsqd:
        image: nsqio/nsq
        # -broadcast-address=宿主机地址 
        command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 -broadcast-address=172.17.0.1
        depends_on:
          - nsqlookupd
        hostname: nsqd
        networks:
          - nsq-network
        ports:
          - "4151:4151"
          - "4150:4150"
          
      nsqadmin:
        image: nsqio/nsq
        command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
        depends_on:
          - nsqlookupd
        hostname: nsqadmin
        ports:
          - "4171:4171"
        networks:
          - nsq-network
     
    networks:
      nsq-network:
        driver: bridge
    

    配置检查

    docker-compose config
    

    启动 docker-compose

    ➜  nsq docker-compose up -d
    Starting nsq_nsqlookupd_1_a12f31d6a776 ... done
    Starting nsq_nsqd_1_1c0db410157f       ... done
    Starting nsq_nsqadmin_1_8c94f3c4a1b7   ... done
    
    
    image.png

    客户端支持的库

    https://nsq.io/clients/client_libraries.html

    image.png

    golang客户端使用

    发送消息

    方式一

    package main
    
    import (
        "bytes"
        "fmt"
        "net/http"
    )
    
    func main() {
    
        httpclient := &http.Client{}
        data := `haha`
    
        endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", 4151, "pub", "test")
        req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(data)))
        resp, err := httpclient.Do(req)
        if err != nil {
            fmt.Printf(err.Error())
            return
        }
        if resp.StatusCode != 200 {
            fmt.Printf("%s status code: %d", "pub", resp.StatusCode)
        }
        defer resp.Body.Close()
    
    }
    
    

    方式二:

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "io/ioutil"
        "log"
        "sync"
        "time"
    )
    
    var err error
    
    // 推送消息
    func main() {
    
        url := "127.0.0.1:4150"
        topicName := "test"
        config := nsq.NewConfig()
    
        // new
        producer, err := nsq.NewProducer(url, config)
        if err != nil {
            fmt.Println("nsq.NewProducer", err)
            return
        }
        fmt.Println("nsq.NewProducer", "√")
        defer producer.Stop()
    
        producer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
        //  ping
        err = producer.Ping()
        if err != nil {
            fmt.Println("producer.Ping", err)
            return
        }
        fmt.Println("producer.Ping", "√")
    
        msgCt:=1000
        wg := &sync.WaitGroup{}
        wg.Add(msgCt)
        // 测试10 次
        for i := 0; i < msgCt; i++ {
    
            // 消息内容
            msg :=  time.Now().Format("0102150405")
            sendMessage(producer, topicName, msg)
            wg.Done()
    
            time.Sleep(10*time.Millisecond)
            // time.Sleep(1 * time.Second)
        }
    
        wg.Wait()
        fmt.Println("producer.Push.Status", "ok")
    }
    
    // 发送消息
    func sendMessage(producer *nsq.Producer, topicName string, msg string) {
    
        err = producer.Publish(topicName, []byte(msg))
        if err != nil {
            fmt.Println("producer.Publish", err)
            return
        }
        fmt.Println("producer.Publish",msg, "√")
    
    }
    
    

    消费记录

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "io/ioutil"
        "log"
        "sync"
    )
    
    
    func main() {
        testNSQ()
    }
    
    type NSQHandler struct {
    }
    
    func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
        fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
        return nil
    }
    
    const (
        TOPIC   = "test"
        CHANNEL_1 = "consumer_channel_1"
        CHANNEL_2 = "consumer_channel_2"
        URL     = "127.0.0.1:4150"
    )
    
    func testNSQ() {
    
        waiter := sync.WaitGroup{}
        waiter.Add(1)
    
        go func() {
            defer waiter.Done()
    
            config := nsq.NewConfig()
            config.MaxInFlight = 10
    
            for i := 0; i < 10; i++ {
                consumer, err := nsq.NewConsumer(TOPIC, CHANNEL_1, config)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
                consumer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
                consumer.AddHandler(&NSQHandler{})
                err = consumer.ConnectToNSQD(URL)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
    
                fmt.Println(CHANNEL_1,i)
            }
            select {}
        }()
    
    
        waiter.Wait()
    }
    
    

    golang实现的demo

    相关文章

      网友评论

          本文标题:nsq消息队列部署以及使用

          本文链接:https://www.haomeiwen.com/subject/vjmpsctx.html