NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。
部署
安装步骤
# 下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 解压
tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 启动服务
cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
使用
1、创建一个test主题,并发送一个hello world消息
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
2、浏览器访问NSQ的管理界面: http://127.0.0.1:4171/
3 消费test主题的消息
$ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
2019/03/13 11:09:49 syncing 1 records to disk
$ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
hello world
客户端
生产者用php实现,消费者用go实现
生产者 采用CURL
<?php
$msg="我是世界最好的语言,谁赞成,谁反对!";
$url= "http://127.0.0.1:4151/pub?topic=test";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: text/html; charset=utf-8',
'Content-Length: ' . strlen($msg))
);
$output = curl_exec($ch);
if($output === FALSE ){
echo "CURL Error:".curl_error($ch);
}
执行脚本
$ php producer.php
OK
消费者 采用go-nsq
库实现
下载你所需要的库
go get github.com/nsqio/go-nsq
代码实现
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"sync"
)
func main() {
testNSQ()
}
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
url := "127.0.0.1:4150"
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "struggle", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD(url)
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
执行代码
$ go run consumer.go
2019/03/13 13:24:43 INF 1 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 2 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 3 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 4 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 5 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 6 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 7 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 8 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 9 [test/struggle] (127.0.0.1:4150) connecting to nsqd
2019/03/13 13:24:43 INF 10 [test/struggle] (127.0.0.1:4150) connecting to nsqd
receive 127.0.0.1:4150 message: 我是世界最好的语言,谁赞成,谁反对!
网友评论