美文网首页
基于NATS实现分布式通信模式

基于NATS实现分布式通信模式

作者: Go语言由浅入深 | 来源:发表于2022-05-04 13:41 被阅读0次

    在分布式应用中经常需要实现服务间的通信,本文我们使用NATS消息中间件来实现服务间的不同通信方式。

    准备工作

    首先创建一个Go项目。注意:本文所介绍的例子运行在Linux/MacOS操作系统环境,但NATS也支持windows系统。

    go mod init example
    

    安装nats包:

    go get  github.com/nats-io/nats.go/@latest
    

    我们将使用以下目录结构:

    .
    ├── cmd
    │   ├── publish-subscribe
    │   │   └── main.go
    │   ├── request-reply
    │   │   └── main.go
    │   └── queue-groups
    │       └── main.go
    ├── go.mod
    └── go.sum
    

    启动本地nats服务:

    docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats
    

    发布订阅模式

    发布订阅模式

    NATS实现了消息的发布和订阅一对多模式。发布者在一个主题上发送消息,在该主题上的任何订阅者都可以收到消息。这种1:N一对多模式也称为:fan-out。
    订阅者还可以在主题中使用通配符,优点类似正则表达式。例如:

    • foo.*可以匹配foo.bar和foo.baz。
    • fo o.*.bar匹配foo.a.bar和foo.b.bar。
    • foo.>匹配上面所有主题。
      消息大小有限制(在nats服务的max_payload配置参数中设置)。默认是1MB,但可以设置最大为64MB。但NATS开发团队推荐最大值设置小点比如8MB。

    为什么需要这种通信模式?

    发布订阅是很常见的使用场景,可以用于发送消息到不同的服务。

    代码

    我们在cmd/publish-subscribe/main.go文件中写该模式代码,首先初始化NATS客户端。

    nc, err := nats.Connect(nats.DEFAULT_ENCODER)
        if err != nil {
            log.Fatal(err)
        }
        defer nc.Close()
    

    使用3个订阅者订阅foo主题,可以实现一个fan-out模式。

    nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 1:", string(msg.Data))
        })
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 2:", string(msg.Data))
        })
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 3:", string(msg.Data))
        })
    

    向foo主题发布消息并等待。

    if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
            log.Fatal(err)
        }
        time.Sleep(2 * time.Second)
    

    完整例子如下,NATS发布消息非常简单。

    package main
    
    import (
        "github.com/nats-io/nats.go"
        "log"
        "time"
    )
    
    func main() {
        nc, err := nats.Connect("nats://localhost:4222")
        if err != nil {
            log.Fatal(err)
        }
        defer nc.Close()
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 1:", string(msg.Data))
        })
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 2:", string(msg.Data))
        })
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Subscribe 3:", string(msg.Data))
        })
    
        if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
            log.Fatal(err)
        }
        time.Sleep(2 * time.Second)
    }
    

    执行结果

    如你所见,消息被发送到所有的订阅者。

    go run cmd/publish-subscribe/main.go
    2022/05/04 12:07:44 Subscribe 3: Here's some stuff
    2022/05/04 12:07:44 Subscribe 1: Here's some stuff
    2022/05/04 12:07:44 Subscribe 2: Here's some stuff
    

    请求应答模式

    请求应答模式

    请求应答(Request-Reply)在分布式系统中也是很常见的通信模式。客户端发送一个请求,会在一定时间内异步等待接收应答消息。
    NATS使请求-应答变得简单而强大,并支持一些强大的特性,比如位置透明、扩和缩容、可观察性等等。

    为什么需要这种模式?

    有时服务间需要一对一的通信,请求应答就非常适合。

    代码

    在cmd/reques-reply/main.go文件中写该模式的代码,还是以初始化NATS客户端代码开始:

    nc, err := nats.Connect(nats.DefaultURL)
    
    if err != nil {
        log.Fatalln(err)
    }
    
    defer nc.Close()
    

    订阅foo主题,添加一些日志并对接收到消息时提供应答:

    nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Request receive:", string(msg.Data))
    
            msg.Respond([]byte("Here you go"))
        })
    

    我们还可以使用不同的应答主题,客户端可以向只响应特定请求者的服务发出请求,创建1对1的关系。
    下面使用NATS客户端的Request方法。包含三个参数:主题、请求内容(字节数组)、请求超时时间。
    以下是完整代码:

    package main
    
    import (
        "github.com/nats-io/nats.go"
        "log"
        "time"
    )
    
    func main() {
        nc, err := nats.Connect("nats://localhost:4222")
        if err != nil {
            log.Fatal(err)
        }
        defer nc.Close()
    
        nc.Subscribe("foo", func(msg *nats.Msg) {
            log.Println("Request receive:", string(msg.Data))
    
            msg.Respond([]byte("Here you go"))
        })
    
        reply, err := nc.Request("foo", []byte("Give me data"), 10*time.Second)
        if err != nil {
            log.Fatal(err)
        }
        log.Println("Got reply:", string(reply.Data))
    }
    

    执行结果

    正如预期的那样,我们的请求收到了,订阅者用一些数据响应了请求。

    $ go run cmd/request-reply/main.go
    2022/05/04 12:25:21 Request receive: Give me data
    2022/05/04 12:25:21 Got reply: Here you go
    
    队列订阅模式
    队列订阅模式

    NATS提供了一种称为分布式队列功能内置负载平衡。使用队列订阅者将在一组订阅者之间平衡消息发送,这组订阅者可用于提供应用程序容错和扩展工作负载。

    为什么需要这种模式

    队列订阅是扩展服务的理想选择。扩展就和运行一个新应用程序一样简单,缩容可以向正在运行的应用发送信号来停止服务。这种灵活性和无需任何配置更改特点使NATS成为一种优秀的服务通信技术,可以与所有平台技术一起使用。NATS的一个重要特性是,队列组由应用程序及其队列订阅者组成,而不是在服务器端配置。

    代码

    要创建一个订阅队列,订阅者需要注册一个队列名。所有包含相同队列名的订阅者组成一个组。无需任何配置。当发布已注册主题的消息时,NATS服务从订阅组中随机选择一个成员接收消息。尽管队列组有多个订阅者,但每个消息只由一个订阅者消费。

    我们在cmd/queue-groups/main.go文件中写代码,和前面例子一样先初始化NATS客户端。

    nc, err := nats.Connect(nats.DefaultURL)
    
    if err != nil {
        log.Fatalln(err)
    }
    
    defer nc.Close()
    

    接下来创建主题为foo的3个队列订阅者,队列名为:queue.foo

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 1:", string(msg.Data))
        })
    
        nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 2:", string(msg.Data))
        })
    
        nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 3:", string(msg.Data))
        })
    

    最后,创建一个循环像foo主题发布不同的消息,可以看出订阅者是如何消费消息的。

    for i:=1; i <= 3; i++{
            message := fmt.Sprintf("Message %d", i)
    
            if err := nc.Publish("foo", []byte(message)); err != nil {
                log.Fatal(err)
            }
        }
    

    以下是完整代码:

    package main
    
    import (
        "fmt"
        "github.com/nats-io/nats.go"
        "log"
        "time"
    )
    
    func main() {
        nc, err := nats.Connect("nats://localhost:4222")
        if err != nil {
            log.Fatal(err)
        }
        defer nc.Close()
    
        nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 1:", string(msg.Data))
        })
    
        nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 2:", string(msg.Data))
        })
    
        nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
            log.Println("Subscribe 3:", string(msg.Data))
        })
    
        for i:=1; i <= 3; i++{
            message := fmt.Sprintf("Message %d", i)
    
            if err := nc.Publish("foo", []byte(message)); err != nil {
                log.Fatal(err)
            }
        }
    
        time.Sleep(2 * time.Second)
    }
    

    执行结果:

    可以看到消息被随机地发送到不同的订阅者。因此,在某种程度上,NATS可以作为服务的7层负载均衡器。

    $ go run cmd/queue-groups/main.go
    2022/05/04 12:46:06 Subscribe 3: Message 1
    2022/05/04 12:46:06 Subscribe 1: Message 3
    2022/05/04 12:46:06 Subscribe 2: Message 2
    

    总结

    在本文中,我们研究了不同的通信模式,展示了NATS的实时分布式消息传递功能。此外,JetStream可以与这些模式结合使用,实现持久化消息传递和消息至少一次消费策略。

    相关文章

      网友评论

          本文标题:基于NATS实现分布式通信模式

          本文链接:https://www.haomeiwen.com/subject/vkmwyrtx.html