消息队列NSQ使用

作者: 零一间 | 来源:发表于2019-03-13 13:36 被阅读0次

    NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。

    部署

    官网下载地址

    安装步骤

    # 下载
    wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
    # 解压
    tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
    # 启动服务
    cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
    nohup ./nsqlookupd > /dev/null 2>&1 &
    nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
    nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
    

    使用

    1、创建一个test主题,并发送一个hello world消息

    curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
    

    2、浏览器访问NSQ的管理界面: http://127.0.0.1:4171/

    image.png

    3 消费test主题的消息

    $ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
    2019/03/13 11:09:49 INF    1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
    2019/03/13 11:09:49 INF    1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
    2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
    2019/03/13 11:09:49 syncing 1 records to disk
    
    $ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
    hello world
    
    

    客户端

    生产者用php实现,消费者用go实现

    生产者 采用CURL

    <?php
    
    $msg="我是世界最好的语言,谁赞成,谁反对!";
    $url= "http://127.0.0.1:4151/pub?topic=test";
    $ch = curl_init();
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
    curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
    curl_setopt($ch, CURLOPT_HTTPHEADER, array(
            'Content-Type: text/html; charset=utf-8',
            'Content-Length: ' . strlen($msg))
    );
    $output = curl_exec($ch);
    if($output === FALSE ){
        echo "CURL Error:".curl_error($ch);
    }
    
    

    执行脚本

    $ php producer.php 
    OK
    

    消费者 采用go-nsq库实现

    下载你所需要的库

    go get github.com/nsqio/go-nsq
    

    代码实现

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "sync"
    )
    
    func main() {
        testNSQ()
    }
    
    type NSQHandler struct {
    }
    
    func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
        fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
        return nil
    }
    
    func testNSQ() {
        url := "127.0.0.1:4150"
    
        waiter := sync.WaitGroup{}
        waiter.Add(1)
    
        go func() {
            defer waiter.Done()
            config:=nsq.NewConfig()
            config.MaxInFlight=9
    
            for i := 0; i<10; i++ {
                consumer, err := nsq.NewConsumer("test", "struggle", config)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
    
                consumer.AddHandler(&NSQHandler{})
                err = consumer.ConnectToNSQD(url)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
            }
            select{}
        }()
    
        waiter.Wait()
    }
    

    执行代码

    $ go run consumer.go 
    2019/03/13 13:24:43 INF    1 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    2 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    3 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    4 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    5 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    6 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    7 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    8 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF    9 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    2019/03/13 13:24:43 INF   10 [test/struggle] (127.0.0.1:4150) connecting to nsqd
    receive 127.0.0.1:4150 message: 我是世界最好的语言,谁赞成,谁反对!
    

    Nsql官网
    快速入门
    高性能消息队列NSQ
    我们是如何使用NSQ处理7500亿消息的
    消息中间件NSQ深入与实践

    相关文章

      网友评论

        本文标题:消息队列NSQ使用

        本文链接:https://www.haomeiwen.com/subject/jqlzpqtx.html