美文网首页
nsq简易使用

nsq简易使用

作者: funcx | 来源:发表于2019-04-16 08:43 被阅读0次
    1. 启动nsqd
    2. 生产者代码tcp
    //Nsq发送测试
    package main
    
    import (
        "bufio"
        "dog/util/log"
        "fmt"
        "os"
    
        "github.com/nsqio/go-nsq"
    )
    
    var producer *nsq.Producer
    
    // 主函数
    func main() {
        strIP1 := "127.0.0.1:4150"
        // strIP2 := "127.0.0.1:4152"
        InitProducer(strIP1)
    
        running := true
    
        //读取控制台输入
        reader := bufio.NewReader(os.Stdin)
        for running {
            data, _, _ := reader.ReadLine()
            command := string(data)
            if command == "stop" {
                running = false
            }
    
            if err := Publish("test", command); err != nil {
                log.Error.Fatal(err)
            }
        }
        //关闭
        producer.Stop()
    }
    
    // 初始化生产者
    func InitProducer(str string) {
        var err error
        fmt.Println("address: ", str)
        producer, err = nsq.NewProducer(str, nsq.NewConfig())
        if err != nil {
            panic(err)
        }
    }
    
    //发布消息
    func Publish(topic string, message string) error {
        var err error
        if producer != nil {
            if message == "" { //不能发布空串,否则会导致error
                return nil
            }
            err = producer.Publish(topic, []byte(message)) // 发布消息
            return err
        }
        return fmt.Errorf("producer is nil")
    }
    
    1. 消费者代码tcp
    //Nsq接收测试
    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/nsqio/go-nsq"
    )
    
    // 消费者
    type ConsumerT struct{}
    
    // 主函数
    func main() {
        InitConsumer("test", "test-channel", "127.0.0.1:4151")
        select {}
    }
    
    //处理消息
    func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
        fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
        return nil
    }
    
    //初始化消费者
    func InitConsumer(topic string, channel string, address string) {
        cfg := nsq.NewConfig()
        cfg.LookupdPollInterval = time.Second          //设置重连时间
        c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
        if err != nil {
            panic(err)
        }
        c.SetLogger(nil, 0)        //屏蔽系统日志
        c.AddHandler(&ConsumerT{}) // 添加消费者接口
    
        //建立NSQLookupd连接
        // if err := c.ConnectToNSQLookupd(address); err != nil {
        //  panic(err)
        // }
    
        //建立多个nsqd连接
        // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
        //  panic(err)
        // }
    
        // 建立一个nsqd连接
        if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
            panic(err)
        }
    }
    

    相关文章

      网友评论

          本文标题:nsq简易使用

          本文链接:https://www.haomeiwen.com/subject/hzcvwqtx.html