1. 下载与安装服务端程序
下载地址:
https://docs.nats.io/nats-server/installation#downloading-a-release-build
这里推荐下载编译好的二进制安装包,解压缩后放在一个合适的目录即可.
2. 启动服务端
image.png配置文件默认没有,可以手工创建一个server.conf 和 auth.conf, 如下所示
// server.conf 内容
listen: 0.0.0.0:4222
include ./auth.conf
// auth.conf 内容
authorization: {
user: "aa",
password: "bb"
}
// 配置文件格式和参数请参阅官方网站
3,创建发布者和订阅者程序
使用类库 : https://github.com/nats-io/nats.go go客户端文档地址
// 发布者代码片段
package main
import (
"fmt"
"github.com/nats-io/nats.go"
"log"
)
const (
DefaultUrl = "localhost:4222"
User = "aa"
PassWord = "bb"
)
func main() {
nc, err := nats.Connect(DefaultUrl, nats.UserInfo(User, PassWord))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Do something with the connection
err = nc.Publish("foo", []byte("Hello World"))
if err != nil {
fmt.Println(err.Error())
}
}
//订阅者代码片段
package main
import (
"github.com/nats-io/nats.go"
"log"
"runtime"
)
const (
DefaultUrl = "localhost:4222"
User = "aa"
PassWord = "bb"
)
func printMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Received on [%s]: '%s'", i, m.Subject, string(m.Data))
}
func main() {
// Connect to NATS
//nc, err := nats.Connect(*urls, opts...)
nc, err := nats.Connect(DefaultUrl, nats.UserInfo(User, PassWord))
if err != nil {
log.Fatal(err)
}
i := 0
nc.Subscribe("foo", func(msg *nats.Msg) {
i += 1
printMsg(msg, i)
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
runtime.Goexit()
nc.Close()
}
// 这个是queue模式,
// 在分发消息时,进行负载均衡,随机发送给同一组中的任意一个订阅者,
// 可以随时增加删除订阅者,配合响应的监控数据和统计数据,对下游的业务进行自动伸缩。
// 提高系统的可用性,避免业务在单点处理导致系统瓶颈。
package main
import (
"flag"
"log"
"os"
"os/signal"
"time"
"github.com/nats-io/nats.go"
)
// nats-qsub -s demo.nats.io <subject> <queue>
// nats-qsub -s demo.nats.io:4443 <subject> <queue> (TLS version)
const (
DefaultUrl = "localhost:4222"
User = "aa"
PassWord = "bb"
)
func usage() {
log.Printf("Usage: nats-qsub [-s server] [-t] [-h] <subject> <queue>\n")
flag.PrintDefaults()
}
func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
func printMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Received on [%s] Queue[%s] Pid[%d]: '%s'", i, m.Subject, m.Sub.Queue, os.Getpid(), string(m.Data))
}
func main() {
var urls = flag.String("s", DefaultUrl, "The nats server URLs (separated by comma)")
var showTime = flag.Bool("t", false, "Display timestamps")
var showHelp = flag.Bool("h", false, "Show help message")
log.SetFlags(0)
flag.Usage = usage
flag.Parse()
if *showHelp {
showUsageAndExit(0)
}
args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}
println("参数数量: ", len(args))
// Connect Options.
opts := []nats.Option{nats.Name("NATS Sample Queue Subscriber"), nats.UserInfo(User, PassWord)}
opts = setupConnOptions(opts)
nc, err := nats.Connect(*urls, opts...)
if err != nil {
log.Fatal(err)
}
subj, queue, i := args[0], args[1], 0
nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
i++
printMsg(msg, i)
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
log.Printf("Listening on [%s], queue group [%s]", subj, queue)
if *showTime {
log.SetFlags(log.LstdFlags)
}
// Setup the interrupt handler to drain so we don't miss
// requests when scaling down.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Println()
log.Printf("Draining...")
nc.Drain()
log.Fatalf("Exiting")
}
func setupConnOptions(opts []nats.Option) []nats.Option {
totalWait := 10 * time.Minute
reconnectDelay := time.Second
opts = append(opts, nats.ReconnectWait(reconnectDelay))
opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Reconnected [%s]", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Fatalf("Exiting: %v", nc.LastError())
}))
return opts
}
网友评论