美文网首页
golang_nsql实际例子使用

golang_nsql实际例子使用

作者: 哆啦在这A梦在哪 | 来源:发表于2020-04-11 09:08 被阅读0次

我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息
官网:https://nsq.io/overview/quick_start.html

使用官方提供的下载地址:

go get github.com/nsqio/go-nsq

第一步,先开启nsq的服务,就是nsq监听和守护进程,还有sqnadmin,就是下面这三句话,官网上面有介绍

nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqadmin --lookupd-http-address=127.0.0.1:4161

先创建一个主题,并且发布100条消息:

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/nsqio/go-nsq"
)

var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "127.0.0.1:4150"
)
//传输的数据格式
type nsqMes struct {
    Name string
    Age  int
    Num  int
}

// 主函数
func nsqSend() {
    //初始化配置
    config := nsq.NewConfig()
    tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
//创建个生产者
        if err != nil {
            fmt.Println(err)
        }
    for i := 0; i < 100; i++ {
        
        //主题
        topic := "Insert"
        //主题内容
        // tCommand := strconv.Itoa(i)
        command := nsqMes{
            Name: "shitingbao",
            Age:  18,
            Num:  i,
        }
        btData, err := json.Marshal(command)
        if err != nil {
            panic(err)
        }
        //发布消息
        err = tPro.Publish(topic, btData)
        if err != nil {
            fmt.Println(err)
        }

    }
    log.Println(time.Now().Format("2006-01-02 03:04:05"))
}

接下来我们看看admin的显示内容:
访问:127.0.0.1:4171
我们可以看到Nsqd的Topic中接收到了100条信息,100条信息都储存在内存中,没有被消化。


image.png

现在没有任何服务订阅了我们的主题,所以主题的消息都没有被消化,那我们创建一个消费者去订阅我们的主题:

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/nsqio/go-nsq"
)

var (
    //nsqd的地址,使用了tcp监听的端口
    tcpNsqdAddrr = "127.0.0.1:4150"
)

//声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
type NsqHandler struct {
    //消息数
    msqCount int64
    //标识ID
    nsqHandlerID string
}

//传输的数据格式
type nsqMes struct {
    Name string
    Age  int
    Num  int
}

//实现HandleMessage方法
//message是接收到的消息
//这个函数一定要有,不需要手动调用
//在调用AddHandler后,内部实现了该方法的自动回调,详细信息看AddHandler内部
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    //没收到一条消息+1
    s.msqCount++
    //打印输出信息和ID
    log.Println(s.msqCount, s.nsqHandlerID)
    //打印消息的一些基本信息
    result := &nsqMes{}
    if err := json.Unmarshal(message.Body, result); err != nil {
        panic(err)
    }
    log.Println("time:", time.Unix(0, message.Timestamp).Format("2006-01-02 03:04:05"), "--adree:", message.NSQDAddress, "--data:", result)
    return nil
}

func main() {

    //初始化配置
    config := nsq.NewConfig()
    //创造消费者,参数一时订阅的主题,参数二是使用的通道,通道名称自己定义
    com, err := nsq.NewConsumer("Insert", "channel1", config)
    if err != nil {
        fmt.Println(err)
    }
    //添加处理回调
    com.AddHandler(&NsqHandler{nsqHandlerID: "One"})
    //连接对应的nsqd
    err = com.ConnectToNSQD(tcpNsqdAddrr)
    if err != nil {
        fmt.Println(err)
    }
    //只是为了不结束此进程,这里没有意义,如果是在web项目中,主程序不会关闭,就不用加这个select或者wgl了
    // var wg = &sync.WaitGroup{}
    // wg.Add(1)
    // wg.Wait()
    select{}
}

这里可以看到,之前挤压的100条信息,都被我们的订阅者消化掉了,也就是读取了
所以我们的订阅者(可以有多个)如果提前订阅主题的话,只要对应的主题有发布新内容,就可以马上异步读取。如果有多个通道的话,每个通道都会读取到所有的数据,比如上述函数在另外执行一个通道名称为channel2的连接,就会出现两个,发送后,该两个通道都会读取到数据。


image.png

注意注意注意:

如果是先发送数据,然后再进行连接读取的,两个连接的开始需要注意,如果像下面这么写,那就有可能第一个连接后,直接把消息都消化了,第二个开启连接后,读不到第一次的消息,需要注意


image.png

排除这个错误也很简单,就是先开启连接,然后再进行数据读取,就不会有消息丢失了

注意二

如果已经存在的通道在一个stop后,消息是没有接收到的,这时候,他的通道内的消息并没有消失,只要这个通道在,等他下一次连接到这个通道后,依然可以接受到这些消息

注意三

如果两个消费者公用一个通道,那这个通道里面的消费就是随机分配的,就是两个人一起用。比如说我这里
onetwo 两个消费者一起用一个通道,看这个channal1里面connections,有两个连接,一共有15条数据,但是看实际输出,one和two两个消费者是都会获取到的数据,一人一半,都不完整。

image.png image.png

相关文章

  • golang_nsql实际例子使用

    我们讲解如何在代码中发布主题内容,然后通过订阅某主题去异步读取消息官网:https://nsq.io/overvi...

  • Redis 实际例子

    Redis 键(key) Redis 哈希(Hash) 效果如下: 列表 List 集合(set) 效果如下: 有...

  • 一个简单的Mvp+RxJava2+Retrofit天气App示例

    晴天娃娃介绍 之前通过两篇文章介绍了Mvp架构的使用 带实际例子的Android架构MVP简述一 带实际例子的An...

  • hiredis之异步调用

    下面是 hiredis 异步调用使用的一个例子(算是对于官方简单例子的一个补充,可能长得更像实际业务中使用的样子)...

  • Gunicorn

    使用 gunicorn 启动程序 其他实际应用的例子 gunicorn -b '0.0.0.0:80' 文件名:a...

  • 和taro一起做SPA 4.redux的使用

    reduct的使用 让我们通过一个实际的例子学习怎样使用reduct,首先,我们需要安装redux 实现reduc...

  • 使用例子

    stream使用例子

  • php 链接redis 实际例子

    项目环境背景: 公司的关于销售排行的数据已经达到了2千多万条,在查询数据库需要等待的时间增长。因此想优化查询速度。...

  • 事务之实际例子(废弃)

    1、前言 平时开发我们经常使用 Spring 事务,而 Spring 默认使用 mysql 的事务。mysql 事...

  • 非暴力沟通实际例子

    愤怒的小天使 …… 小天使说:“怎么开车的,我发现你一开车,就愚蠢至极,笨的很,教都教不会,还特别的危险。 (我猜...

网友评论

      本文标题:golang_nsql实际例子使用

      本文链接:https://www.haomeiwen.com/subject/ucopyctx.html