美文网首页技术文技术干货
NSQ学习:实现有序的消息队列

NSQ学习:实现有序的消息队列

作者: imxyb | 来源:发表于2019-06-11 22:51 被阅读0次

    NSQ是一个内存+磁盘型的消息中间件,它使用push流的方式源源不断把消息推送给客户端,并且为了使服务端更加简单、高效,NSQ并不提供有序的消息队列。因此,如果对消息有顺序要求,只有两种解决办法:

    1. 改用类似kafka之类的有序消息队列;
    2. 生产者和消费者达成一个协议,比如增加一个序列号或者时间戳来表示顺序。

    本文要介绍的是第二种方法,下面就来简单用golang实现一个有序的NSQ顺序消息队列。

    首先,使用go-nsq客户端写一个main函数,并且连接上nsqlookupd:

    func main() {
        cfg := nsq.NewConfig()
        cfg.LookupdPollInterval = time.Second
        customer, err := nsq.NewConsumer("test2", "t1", cfg)
        if err != nil {
            log.Panic(err)
        }
        customer.AddHandler(&Customter{})
        customer.SetLogger(nil, 0)
        if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
            log.Panic(err)
        }
        select {}
    }
    

    这样就表示这个消费者客户端订阅了test2这个topic的t1channel了。

    接下来,我们与生产者端协商消息的格式是一个JSON文本字符串,并且这个JSON会传递一个时间戳用来表示消息的先后顺序,那么我们可以定义一个实现了sort.Interface的切片:

    type MyMessage struct {
        Name     string `json:"name"`
        CreateAt int64  `json:"create_at"`
    }
    
    type MyMessageList []MyMessage
    
    func (list MyMessageList) Len() int {
        return len(list)
    }
    
    func (list MyMessageList) Less(i, j int) bool {
        return list[i].CreateAt < list[j].CreateAt
    }
    
    func (list MyMessageList) Swap(i, j int) {
        list[i], list[j] = list[j], list[i]
    }
    

    sort.Interface有三个方法,分别是LenLessSwap。当实现了这三个方法,MyMessageList这个切片就能够把里面的MyMessage按照createAt字段的升序来进行排序。

    最后,我们定义一个Customer结构体的HandleMessage方法来接收NSQ生产者的消息:

    type Customter struct{}
    
    // 存放消息结构体
    var messageBuffer []MyMessage
    
    func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
        var msg MyMessage
        err := json.Unmarshal(nsqMsg.Body, &msg)
        if err != nil {
            log.Panicln(err)
        }
        messageBuffer = append(messageBuffer, msg)
        if len(messageBuffer) == 3 {
            sort.Sort(MyMessageList(messageBuffer))
            // do something for the ordered message buffer
            fmt.Println(messageBuffer)
            // reset buffer
            messageBuffer = messageBuffer[:0]
        }
        return nil
    }
    

    上面代码非常简单,每次收到一条消息,我们都会先把它放到一个buffer里面,然后当buffer的长度达到3的时候,就调用sort.Sort来进行一次排序,最后就能顺利拿到3个有序的消息,就可以做相应的操作了。

    我们来简单实验一下,首先启动客户端(nsqd和nsqlookupd的启动参考官网文档,这里不介绍):

     $ ~/code/golang/src/nsq-test > go run main.go
    

    然后在终端通过curl给nsqd的test2 topic生产3条数据:

    curl -d '{"name":"aaa","create_at":1560262051}' http://127.0.0.1:4151/pub\?topic\=test2
    curl -d '{"name":"bbb","create_at":1560262060}' http://127.0.0.1:4151/pub\?topic\=test2
    curl -d '{"name":"ccc","create_at":1560262053}' http://127.0.0.1:4151/pub\?topic\=test2
    

    注意,这里按照时间戳先后应该是aaacccbbb

    最后,我们回到启动客户端的终端查看,会发现打印出正确的结果:

    # aaa ccc bbb 顺序是正确的
    [{aaa 1560262051} {ccc 1560262053} {bbb 1560262060}]
    

    到此就大功告成了~

    完整代码在此:

    package main
    
    import (
        "encoding/json"
        "fmt"
        "github.com/nsqio/go-nsq"
        "log"
        "sort"
        "time"
    )
    
    type MyMessage struct {
        Name     string `json:"name"`
        CreateAt int64  `json:"create_at"`
    }
    
    type MyMessageList []MyMessage
    
    func (list MyMessageList) Len() int {
        return len(list)
    }
    
    func (list MyMessageList) Less(i, j int) bool {
        return list[i].CreateAt < list[j].CreateAt
    }
    
    func (list MyMessageList) Swap(i, j int) {
        list[i], list[j] = list[j], list[i]
    }
    
    type Customter struct{}
    
    var messageBuffer []MyMessage
    
    func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
        var msg MyMessage
        err := json.Unmarshal(nsqMsg.Body, &msg)
        if err != nil {
            log.Panicln(err)
        }
        messageBuffer = append(messageBuffer, msg)
        if len(messageBuffer) == 3 {
            sort.Sort(MyMessageList(messageBuffer))
            // do something for the ordered message buffer
            fmt.Println(messageBuffer)
            // reset buffer
            messageBuffer = messageBuffer[:0]
        }
        return nil
    }
    
    func main() {
        cfg := nsq.NewConfig()
        cfg.LookupdPollInterval = time.Second
        customer, err := nsq.NewConsumer("test2", "t1", cfg)
        if err != nil {
            log.Panic(err)
        }
        customer.AddHandler(&Customter{})
        customer.SetLogger(nil, 0)
        if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
            log.Panic(err)
        }
        select {}
    }
    

    相关文章

      网友评论

        本文标题:NSQ学习:实现有序的消息队列

        本文链接:https://www.haomeiwen.com/subject/urfhfctx.html