我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息
官网:https://nsq.io/overview/quick_start.html
使用官方提供的下载地址:
go get github.com/nsqio/go-nsq
第一步,先开启nsq的服务,就是nsq监听和守护进程,还有sqnadmin,就是下面这三句话,官网上面有介绍
nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqadmin --lookupd-http-address=127.0.0.1:4161
先创建一个主题,并且发布100条消息:
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
var (
//nsqd的地址,使用了tcp监听的端口
tcpNsqdAddrr = "127.0.0.1:4150"
)
//传输的数据格式
type nsqMes struct {
Name string
Age int
Num int
}
// 主函数
func nsqSend() {
//初始化配置
config := nsq.NewConfig()
tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
//创建个生产者
if err != nil {
fmt.Println(err)
}
for i := 0; i < 100; i++ {
//主题
topic := "Insert"
//主题内容
// tCommand := strconv.Itoa(i)
command := nsqMes{
Name: "shitingbao",
Age: 18,
Num: i,
}
btData, err := json.Marshal(command)
if err != nil {
panic(err)
}
//发布消息
err = tPro.Publish(topic, btData)
if err != nil {
fmt.Println(err)
}
}
log.Println(time.Now().Format("2006-01-02 03:04:05"))
}
接下来我们看看admin的显示内容:
访问:127.0.0.1:4171
我们可以看到Nsqd的Topic中接收到了100条信息,100条信息都储存在内存中,没有被消化。
image.png
现在没有任何服务订阅了我们的主题,所以主题的消息都没有被消化,那我们创建一个消费者去订阅我们的主题:
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
var (
//nsqd的地址,使用了tcp监听的端口
tcpNsqdAddrr = "127.0.0.1:4150"
)
//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
//消息数
msqCount int64
//标识ID
nsqHandlerID string
}
//传输的数据格式
type nsqMes struct {
Name string
Age int
Num int
}
//实现HandleMessage方法
//message是接收到的消息
//这个函数一定要有,不需要手动调用
//在调用AddHandler后,内部实现了该方法的自动回调,详细信息看AddHandler内部
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
//没收到一条消息+1
s.msqCount++
//打印输出信息和ID
log.Println(s.msqCount, s.nsqHandlerID)
//打印消息的一些基本信息
result := &nsqMes{}
if err := json.Unmarshal(message.Body, result); err != nil {
panic(err)
}
log.Println("time:", time.Unix(0, message.Timestamp).Format("2006-01-02 03:04:05"), "--adree:", message.NSQDAddress, "--data:", result)
return nil
}
func main() {
//初始化配置
config := nsq.NewConfig()
//创造消费者,参数一时订阅的主题,参数二是使用的通道,通道名称自己定义
com, err := nsq.NewConsumer("Insert", "channel1", config)
if err != nil {
fmt.Println(err)
}
//添加处理回调
com.AddHandler(&NsqHandler{nsqHandlerID: "One"})
//连接对应的nsqd
err = com.ConnectToNSQD(tcpNsqdAddrr)
if err != nil {
fmt.Println(err)
}
//只是为了不结束此进程,这里没有意义,如果是在web项目中,主程序不会关闭,就不用加这个select或者wgl了
// var wg = &sync.WaitGroup{}
// wg.Add(1)
// wg.Wait()
select{}
}
这里可以看到,之前挤压的100条信息,都被我们的订阅者消化掉了,也就是读取了
所以我们的订阅者(可以有多个)如果提前订阅主题的话,只要对应的主题有发布新内容,就可以马上异步读取。如果有多个通道的话,每个通道都会读取到所有的数据,比如上述函数在另外执行一个通道名称为channel2的连接,就会出现两个,发送后,该两个通道都会读取到数据。
image.png
注意注意注意:
如果是先发送数据,然后再进行连接读取的,两个连接的开始需要注意,如果像下面这么写,那就有可能第一个连接后,直接把消息都消化了,第二个开启连接后,读不到第一次的消息,需要注意
image.png
排除这个错误也很简单,就是先开启连接,然后再进行数据读取,就不会有消息丢失了
注意二
如果已经存在的通道在一个stop后,消息是没有接收到的,这时候,他的通道内的消息并没有消失,只要这个通道在,等他下一次连接到这个通道后,依然可以接受到这些消息
注意三
如果两个消费者公用一个通道,那这个通道里面的消费就是随机分配的,就是两个人一起用。比如说我这里
one 和 two 两个消费者一起用一个通道,看这个channal1里面connections,有两个连接,一共有15条数据,但是看实际输出,one和two两个消费者是都会获取到的数据,一人一半,都不完整。
网友评论