美文网首页
golang操作nsq

golang操作nsq

作者: 王宣成 | 来源:发表于2021-10-16 11:33 被阅读0次

    windows 安装 nsq https://nsq.io/deployment/installing.html
    下载解压,添加系统环境变量如 D:\nsq\bin

    执行命令,分别新开cmd命令窗
    nsqlookupd
    
    nsqd --lookupd-tcp-address=127.0.0.1:4160
    
    nsqadmin --lookupd-http-address=127.0.0.1:4161
    
    # 发布初始消息(也在集群中创建主题)
    curl -d "hello,world" "http://127.0.0.1:4151/pub?topic=test"
    
    # 日志会写到到这里
    nsq_to_file --topic=test --output-dir=D:\nsq\tmp --lookupd-http-address=127.0.0.1:4161
    
    
    发布更多消息
    curl -d "hello,world2" "http://127.0.0.1:4151/pub?topic=test"
    
    curl -d "hello,world3" "http://127.0.0.1:4151/pub?topic=test"
    
    生产者
    package main
    
    import (
        "fmt"
    
        nsq "github.com/nsqio/go-nsq"
    )
    
    func main() {
        var producer *nsq.Producer
        // 初始化生产者
        // producer, err := nsq.NewProducer("地址:端口", nsq.*Config )
        producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
        if err != nil {
            panic(err)
        }
    
        err = producer.Ping()
        if nil != err {
            // 关闭生产者
            producer.Stop()
            producer = nil
        }
    
        fmt.Println("ping nsq success")
    
        // 生产者写入nsq,10条消息,topic = "test"
        topic := "test"
        for i := 0; i < 10; i++ {
            message := fmt.Sprintf("message:%d", i)
            if producer != nil && message != "" { //不能发布空串,否则会导致error
                err = producer.Publish(topic, []byte(message)) // 发布消息
                if err != nil {
                    fmt.Printf("producer.Publish,err : %v", err)
                }
                fmt.Println(message)
            }
        }
    
        fmt.Println("producer.Publish success")
    
    }
    
    
    消费者
    package main
    
    import (
        "fmt"
        "time"
    
        nsq "github.com/nsqio/go-nsq"
    )
    
    type ConsumerT struct{}
    
    func main() {
        topic := "test"
        InitConsumer(topic, "test-channel", "127.0.0.1:4161")
        for {
            time.Sleep(time.Second * 10)
        }
    }
    
    //处理消息
    func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
        fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
        return nil
    }
    
    //初始化消费者
    func InitConsumer(topic string, channel string, address string) {
        cfg := nsq.NewConfig()
        cfg.LookupdPollInterval = time.Second          //设置重连时间
        c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
        if err != nil {
            panic(err)
        }
        c.SetLogger(nil, 0)        //屏蔽系统日志
        c.AddHandler(&ConsumerT{}) // 添加消费者接口
    
        //建立NSQLookupd连接
        if err := c.ConnectToNSQLookupd(address); err != nil {
            panic(err)
        }
    
        //建立多个nsqd连接
        // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
        //  panic(err)
        // }
    
        // 建立一个nsqd连接
        // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
        //  panic(err)
        // }
    }
    
    

    相关文章

      网友评论

          本文标题:golang操作nsq

          本文链接:https://www.haomeiwen.com/subject/nyxmoltx.html