美文网首页
golang操作nsq

golang操作nsq

作者: 王宣成 | 来源:发表于2021-10-16 11:33 被阅读0次

windows 安装 nsq https://nsq.io/deployment/installing.html
下载解压,添加系统环境变量如 D:\nsq\bin

执行命令,分别新开cmd命令窗
nsqlookupd

nsqd --lookupd-tcp-address=127.0.0.1:4160

nsqadmin --lookupd-http-address=127.0.0.1:4161

# 发布初始消息(也在集群中创建主题)
curl -d "hello,world" "http://127.0.0.1:4151/pub?topic=test"

# 日志会写到到这里
nsq_to_file --topic=test --output-dir=D:\nsq\tmp --lookupd-http-address=127.0.0.1:4161

发布更多消息
curl -d "hello,world2" "http://127.0.0.1:4151/pub?topic=test"

curl -d "hello,world3" "http://127.0.0.1:4151/pub?topic=test"
生产者
package main

import (
    "fmt"

    nsq "github.com/nsqio/go-nsq"
)

func main() {
    var producer *nsq.Producer
    // 初始化生产者
    // producer, err := nsq.NewProducer("地址:端口", nsq.*Config )
    producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
    if err != nil {
        panic(err)
    }

    err = producer.Ping()
    if nil != err {
        // 关闭生产者
        producer.Stop()
        producer = nil
    }

    fmt.Println("ping nsq success")

    // 生产者写入nsq,10条消息,topic = "test"
    topic := "test"
    for i := 0; i < 10; i++ {
        message := fmt.Sprintf("message:%d", i)
        if producer != nil && message != "" { //不能发布空串,否则会导致error
            err = producer.Publish(topic, []byte(message)) // 发布消息
            if err != nil {
                fmt.Printf("producer.Publish,err : %v", err)
            }
            fmt.Println(message)
        }
    }

    fmt.Println("producer.Publish success")

}

消费者
package main

import (
    "fmt"
    "time"

    nsq "github.com/nsqio/go-nsq"
)

type ConsumerT struct{}

func main() {
    topic := "test"
    InitConsumer(topic, "test-channel", "127.0.0.1:4161")
    for {
        time.Sleep(time.Second * 10)
    }
}

//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

//初始化消费者
func InitConsumer(topic string, channel string, address string) {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second          //设置重连时间
    c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
    if err != nil {
        panic(err)
    }
    c.SetLogger(nil, 0)        //屏蔽系统日志
    c.AddHandler(&ConsumerT{}) // 添加消费者接口

    //建立NSQLookupd连接
    if err := c.ConnectToNSQLookupd(address); err != nil {
        panic(err)
    }

    //建立多个nsqd连接
    // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
    //  panic(err)
    // }

    // 建立一个nsqd连接
    // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
    //  panic(err)
    // }
}

相关文章

  • golang操作nsq

    windows 安装 nsq https://nsq.io/deployment/installing.html[...

  • 20年第45周:nsq操作

    一、NSQ架构 二、Nsq操作

  • golang nsq

    1.安装 https://nsq.io/deployment/installing.html 2.快速入门 在一个...

  • golang使用Nsq

    为什么要使用Nsq 最近一直在寻找一个高性能,高可用的消息队列做内部服务之间的通讯。一开始想到用zeromq,但在...

  • 23.NSQ

    NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操作NSQ。 组件: nsqdloo...

  • nsq源码(11) nsqlookupd与消费者交互

    nsq一共提供了几种消费者客户端工具:nsq_to_file、nsq_to_http、nsq_to_nsq nsq...

  • Nsq从入门到实践

    当nsq跑起来之后, 我们可能会遇到以下问题 分布式部署 处理错误(何时requeue) 如何使用golang l...

  • golang积累-WaitGroup包装

    在看NSQ源码时看到封装了waitgroup方法,很实用,于是网上找了一篇文章,顺带整个流程熟悉一遍 golang...

  • Go消息中间件Nsq系列(四)------apps/nsq_to

    上一篇: Go消息中间件Nsq系列(三)------apps/nsq_to_nsq源码阅读 apps/nsq_to...

  • Go map底层实现

    golang map源码详解Golang map 如何进行删除操作?

网友评论

      本文标题:golang操作nsq

      本文链接:https://www.haomeiwen.com/subject/nyxmoltx.html