在分布式应用中经常需要实现服务间的通信,本文我们使用NATS消息中间件来实现服务间的不同通信方式。
准备工作
首先创建一个Go项目。注意:本文所介绍的例子运行在Linux/MacOS操作系统环境,但NATS也支持windows系统。
go mod init example
安装nats包:
go get github.com/nats-io/nats.go/@latest
我们将使用以下目录结构:
.
├── cmd
│ ├── publish-subscribe
│ │ └── main.go
│ ├── request-reply
│ │ └── main.go
│ └── queue-groups
│ └── main.go
├── go.mod
└── go.sum
启动本地nats服务:
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats
发布订阅模式
发布订阅模式NATS实现了消息的发布和订阅一对多模式。发布者在一个主题上发送消息,在该主题上的任何订阅者都可以收到消息。这种1:N一对多模式也称为:fan-out。
订阅者还可以在主题中使用通配符,优点类似正则表达式。例如:
- foo.*可以匹配foo.bar和foo.baz。
- fo o.*.bar匹配foo.a.bar和foo.b.bar。
- foo.>匹配上面所有主题。
消息大小有限制(在nats服务的max_payload配置参数中设置)。默认是1MB,但可以设置最大为64MB。但NATS开发团队推荐最大值设置小点比如8MB。
为什么需要这种通信模式?
发布订阅是很常见的使用场景,可以用于发送消息到不同的服务。
代码
我们在cmd/publish-subscribe/main.go文件中写该模式代码,首先初始化NATS客户端。
nc, err := nats.Connect(nats.DEFAULT_ENCODER)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
使用3个订阅者订阅foo主题,可以实现一个fan-out模式。
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 1:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 2:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 3:", string(msg.Data))
})
向foo主题发布消息并等待。
if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
完整例子如下,NATS发布消息非常简单。
package main
import (
"github.com/nats-io/nats.go"
"log"
"time"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 1:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 2:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscribe 3:", string(msg.Data))
})
if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
}
执行结果
如你所见,消息被发送到所有的订阅者。
go run cmd/publish-subscribe/main.go
2022/05/04 12:07:44 Subscribe 3: Here's some stuff
2022/05/04 12:07:44 Subscribe 1: Here's some stuff
2022/05/04 12:07:44 Subscribe 2: Here's some stuff
请求应答模式
请求应答模式请求应答(Request-Reply)在分布式系统中也是很常见的通信模式。客户端发送一个请求,会在一定时间内异步等待接收应答消息。
NATS使请求-应答变得简单而强大,并支持一些强大的特性,比如位置透明、扩和缩容、可观察性等等。
为什么需要这种模式?
有时服务间需要一对一的通信,请求应答就非常适合。
代码
在cmd/reques-reply/main.go文件中写该模式的代码,还是以初始化NATS客户端代码开始:
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
订阅foo主题,添加一些日志并对接收到消息时提供应答:
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Request receive:", string(msg.Data))
msg.Respond([]byte("Here you go"))
})
我们还可以使用不同的应答主题,客户端可以向只响应特定请求者的服务发出请求,创建1对1的关系。
下面使用NATS客户端的Request方法。包含三个参数:主题、请求内容(字节数组)、请求超时时间。
以下是完整代码:
package main
import (
"github.com/nats-io/nats.go"
"log"
"time"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Request receive:", string(msg.Data))
msg.Respond([]byte("Here you go"))
})
reply, err := nc.Request("foo", []byte("Give me data"), 10*time.Second)
if err != nil {
log.Fatal(err)
}
log.Println("Got reply:", string(reply.Data))
}
执行结果
正如预期的那样,我们的请求收到了,订阅者用一些数据响应了请求。
$ go run cmd/request-reply/main.go
2022/05/04 12:25:21 Request receive: Give me data
2022/05/04 12:25:21 Got reply: Here you go
队列订阅模式
队列订阅模式NATS提供了一种称为分布式队列功能内置负载平衡。使用队列订阅者将在一组订阅者之间平衡消息发送,这组订阅者可用于提供应用程序容错和扩展工作负载。
为什么需要这种模式
队列订阅是扩展服务的理想选择。扩展就和运行一个新应用程序一样简单,缩容可以向正在运行的应用发送信号来停止服务。这种灵活性和无需任何配置更改特点使NATS成为一种优秀的服务通信技术,可以与所有平台技术一起使用。NATS的一个重要特性是,队列组由应用程序及其队列订阅者组成,而不是在服务器端配置。
代码
要创建一个订阅队列,订阅者需要注册一个队列名。所有包含相同队列名的订阅者组成一个组。无需任何配置。当发布已注册主题的消息时,NATS服务从订阅组中随机选择一个成员接收消息。尽管队列组有多个订阅者,但每个消息只由一个订阅者消费。
我们在cmd/queue-groups/main.go文件中写代码,和前面例子一样先初始化NATS客户端。
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
接下来创建主题为foo的3个队列订阅者,队列名为:queue.foo
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 1:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 2:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 3:", string(msg.Data))
})
最后,创建一个循环像foo主题发布不同的消息,可以看出订阅者是如何消费消息的。
for i:=1; i <= 3; i++{
message := fmt.Sprintf("Message %d", i)
if err := nc.Publish("foo", []byte(message)); err != nil {
log.Fatal(err)
}
}
以下是完整代码:
package main
import (
"fmt"
"github.com/nats-io/nats.go"
"log"
"time"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 1:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 2:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscribe 3:", string(msg.Data))
})
for i:=1; i <= 3; i++{
message := fmt.Sprintf("Message %d", i)
if err := nc.Publish("foo", []byte(message)); err != nil {
log.Fatal(err)
}
}
time.Sleep(2 * time.Second)
}
执行结果:
可以看到消息被随机地发送到不同的订阅者。因此,在某种程度上,NATS可以作为服务的7层负载均衡器。
$ go run cmd/queue-groups/main.go
2022/05/04 12:46:06 Subscribe 3: Message 1
2022/05/04 12:46:06 Subscribe 1: Message 3
2022/05/04 12:46:06 Subscribe 2: Message 2
总结
在本文中,我们研究了不同的通信模式,展示了NATS的实时分布式消息传递功能。此外,JetStream可以与这些模式结合使用,实现持久化消息传递和消息至少一次消费策略。
网友评论