windows 安装 nsq https://nsq.io/deployment/installing.html
下载解压,添加系统环境变量如 D:\nsq\bin
执行命令,分别新开cmd命令窗
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqadmin --lookupd-http-address=127.0.0.1:4161
# 发布初始消息(也在集群中创建主题)
curl -d "hello,world" "http://127.0.0.1:4151/pub?topic=test"
# 日志会写到到这里
nsq_to_file --topic=test --output-dir=D:\nsq\tmp --lookupd-http-address=127.0.0.1:4161
发布更多消息
curl -d "hello,world2" "http://127.0.0.1:4151/pub?topic=test"
curl -d "hello,world3" "http://127.0.0.1:4151/pub?topic=test"
生产者
package main
import (
"fmt"
nsq "github.com/nsqio/go-nsq"
)
func main() {
var producer *nsq.Producer
// 初始化生产者
// producer, err := nsq.NewProducer("地址:端口", nsq.*Config )
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
if err != nil {
panic(err)
}
err = producer.Ping()
if nil != err {
// 关闭生产者
producer.Stop()
producer = nil
}
fmt.Println("ping nsq success")
// 生产者写入nsq,10条消息,topic = "test"
topic := "test"
for i := 0; i < 10; i++ {
message := fmt.Sprintf("message:%d", i)
if producer != nil && message != "" { //不能发布空串,否则会导致error
err = producer.Publish(topic, []byte(message)) // 发布消息
if err != nil {
fmt.Printf("producer.Publish,err : %v", err)
}
fmt.Println(message)
}
}
fmt.Println("producer.Publish success")
}
消费者
package main
import (
"fmt"
"time"
nsq "github.com/nsqio/go-nsq"
)
type ConsumerT struct{}
func main() {
topic := "test"
InitConsumer(topic, "test-channel", "127.0.0.1:4161")
for {
time.Sleep(time.Second * 10)
}
}
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
//初始化消费者
func InitConsumer(topic string, channel string, address string) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second //设置重连时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddHandler(&ConsumerT{}) // 添加消费者接口
//建立NSQLookupd连接
if err := c.ConnectToNSQLookupd(address); err != nil {
panic(err)
}
//建立多个nsqd连接
// if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
// panic(err)
// }
// 建立一个nsqd连接
// if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
// panic(err)
// }
}
网友评论