美文网首页
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式

我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式

作者: 阿兵云原生 | 来源:发表于2021-09-30 21:50 被阅读0次

    我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用

    嗨,大家好,我是小魔童哪吒,咱们从今天开始进入开源组件的学习,一边学习一边总结一边分享

    文章提纲如下:

    • RabbitMQ 成员组成
    • RabbitMQ 的六种工作模式编码

    RabbitMQ 成员组成

    • 生产者 producer
    • 消费者 consumer
    • 交换机 exchange

    用于接受、分配消息

    • 消息 message
    • 队列 queue

    用于存储生产者的消息

    • 信道 channel AMQP

    消息推送使用的通道

    • 连接 connections

    生成者或者消费者与Rabbit 建立的TCP 连接

    • 路由键 routingKey

    用于把生成者的数据分配到交换器上

    • 绑定键 BindingKey

    用于把交换器的消息绑定到队列上

    • 连接管理器 ConnectionFactory

    应用程序与 Rabbit 之间建立连接的管理器,程序代码中使用

    RabbitMQ 的六种工作模式编码

    single 模式

    • 消息产生者将消息放入队列
    • 消息的消费者监听消息队列,如果队列中有消息就消费掉

    目录如下:

    .
    ├── consumer.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── xmtmq
        └── xmtmq.go
    

    实际编码如下:

    每种模式的编码思路如下:

    生产者 / 消费者

    • 连接 RabbitMQ 的 server
    • 初始化连接 connection
    • 初始化通道 channel
    • 初始化交换机 exchange
    • 初始化队列 queue
    • 使用路由key,绑定队列 bind , key
    • 生产消息 / 消费消息 produce , consume

    消息xmtmq.go

    package xmtmq
    
    import (
       "github.com/streadway/amqp"
       "log"
    )
    // single 模式
    // 定义 RabbitMQ 的数据结构
    // go get github.com/streadway/amqp
    
    type RabbitMQ struct {
       conn      *amqp.Connection // 连接
       channel   *amqp.Channel    // 通道
       QueueName string           // 队列名
       Exchange  string           // 交换机
       Key       string           // 路由键
       MQUrl     string           // MQ的虚拟机地址
    }
    
    // New 一个 RabbitMQ
    func NewRabbitMQ(rbt *RabbitMQ) {
       if rbt == nil || rbt.QueueName == ""  || rbt.MQUrl == "" {
          log.Panic("please check QueueName,Exchange,MQUrl ...")
       }
    
       conn, err := amqp.Dial(rbt.MQUrl)
       if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
       }
       rbt.conn = conn
    
       channel, err := rbt.conn.Channel()
       if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
       }
       rbt.channel = channel
    }
    
    
    func RabbitMQFree(rbt *RabbitMQ){
       if rbt == nil{
          log.Printf("rbt is nil,free failed")
          return
       }
       rbt.channel.Close()
       rbt.conn.Close()
    }
    func (rbt *RabbitMQ) Init() {
       // 申请队列
       _, err := rbt.channel.QueueDeclare(
          rbt.QueueName, // 队列名
          true,          // 是否持久化
          false,         // 是否自动删除
          false,         // 是否排他
          false,         // 是否阻塞
          nil,           // 其他参数
       )
       if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
       }
    }
    
    
    // 生产消息
    
    func (rbt *RabbitMQ) Produce(data []byte) {
    
       // 向队列中加入数据
       err := rbt.channel.Publish(
          rbt.Exchange,        // 交换机
          rbt.QueueName,       // 队列名
          false,    // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
          false,    // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
          amqp.Publishing{
             ContentType: "text/plain",
             Body:        data,
          },
       )
       if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
       }
       return
    }
    
    // 消费消息
    func (rbt *RabbitMQ) Consume() {
    
       // 消费数据
       msg, err := rbt.channel.Consume(
          rbt.QueueName,    // 队列名
          "xmt",    // 消费者的名字
          true,     // 是否自动应答
          false,    // 是否排他
          false,    // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
          false,    // 是否阻塞
          nil,         // 其他属性
       )
    
       if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
       }
    
       for data := range msg {
          log.Printf("received data is %v", string(data.Body))
       }
    
    }
    

    main.go

    package main
    
    import (
       "fmt"
       "log"
       "time"
       "xmt/xmtmq"
    )
    
    /*
    RabbimtMQ single 模式 案例
    应用场景:简单消息队列的使用,一个生产者一个消费者
    生产消息
    */
    
    func main() {
        // 设置日志
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
        
       xmtmq.NewRabbitMQ(rbt)
    
       var index = 0
    
       for {
           // 生产消息
          rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
          log.Println("发送成功 ", index)
          index++
          time.Sleep(1 * time.Second)
       }
    
    }
    

    consumer.go

    package main
    
    import (
       "log"
       "xmt/xmtmq"
    )
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.Consume()
    }
    

    运行的时候,打开2个终端

    终端1:go run main.go

    终端2:go run consumer.go

    work 模式

    多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者,此处的资源是竞争关系

    当生产者生产消息的速度大于消费者消费的速度,就要考虑用 work 工作模式,这样能提高处理速度提高负载

    work 模式与 single 模式类似, 只是work 模式比 single 模式多了一些消费者

    基于single 模式,开一个终端3 : go run consumer.go

    publish / subscribe 模式

    publish / subscribe 发布订阅模式 , 相对于Work queues模式多了一个交换机,此处的资源是共享的

    用于场景

    • 邮件群发
    • 群聊天
    • 广播(广告等)

    目录和上述编码保持一致:

    xmtmq.go

    开始用到交换机 exchange ,fanout 类型

    生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息

    package xmtmq
    
    import (
       "github.com/streadway/amqp"
       "log"
    )
    
    // publish 模式
    // 定义 RabbitMQ 的数据结构
    // go get github.com/streadway/amqp
    
    type RabbitMQ struct {
       conn      *amqp.Connection // 连接
       channel   *amqp.Channel    // 通道
       QueueName string           // 队列名
       Exchange  string           // 交换机
       Key       string           // 路由键
       MQUrl     string           // MQ的虚拟机地址
    }
    
    // New 一个 RabbitMQ
    func NewRabbitMQ(rbt *RabbitMQ) {
       if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
          log.Panic("please check Exchange,MQUrl ...")
       }
    
       conn, err := amqp.Dial(rbt.MQUrl)
       if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
       }
       rbt.conn = conn
    
       channel, err := rbt.conn.Channel()
       if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
       }
       rbt.channel = channel
    }
    
    func RabbitMQFree(rbt *RabbitMQ) {
       if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
       }
    
       rbt.channel.Close()
       rbt.conn.Close()
    }
    
    func (rbt *RabbitMQ) Init() {
       // 1、创建交换机
       err := rbt.channel.ExchangeDeclare(
          rbt.Exchange,        // 交换机
          amqp.ExchangeFanout, // 交换机类型
          true,                // 是否持久化
          false,               //是否自动删除
          false,               //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
          false,               // 是否阻塞
          nil,                 // 其他属性
       )
       if err != nil {
          log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
          return
       }
    
    }
    
    // 生产消息 publish
    
    func (rbt *RabbitMQ) PublishMsg(data []byte) {
    
       // 1、向队列中加入数据
       err := rbt.channel.Publish(
          rbt.Exchange, // 交换机
          "",           // 队列名
          false,        // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
          false,        // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
          amqp.Publishing{
             ContentType: "text/plain",
             Body:        data,
          },
       )
       if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
       }
       return
    
    }
    
    // 消费消息
    func (rbt *RabbitMQ) SubscribeMsg() {
    
       // 1、创建队列
       q, err := rbt.channel.QueueDeclare(
          "", // 此处我们传入的是空,则是随机产生队列的名称
          true,
          false,
          false,
          false,
          nil,
       )
       if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
       }
    
       // 2、绑定队列
       err = rbt.channel.QueueBind(
          q.Name,       // 队列名字
          "",           // 在publish模式下,这里key 为空
          rbt.Exchange, // 交换机名称
          false,        // 是否阻塞
          nil,          // 其他属性
       )
       if err != nil {
          log.Printf("rbt.channel.QueueBind error : %v", err)
          return
       }
    
       // 3、消费数据
       msg, err := rbt.channel.Consume(
          q.Name, // 队列名
          "xmt",  // 消费者的名字
          true,   // 是否自动应答
          false,  // 是否排他
          false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
          false,  // 是否阻塞
          nil,    // 其他属性
       )
    
       if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
       }
    
       for data := range msg {
          log.Printf("received data is %v", string(data.Body))
       }
    
    }
    

    main.go

    package main
    
    import (
       "fmt"
       "log"
       "time"
       "xmt/xmtmq"
    )
    
    /*
    RabbimtMQ publish 模式 案例
    应用场景:邮件群发,群聊天,广播(广告)
    生产消息
    */
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          Exchange:  "xmtPubEx",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.Init()
    
       var index = 0
    
       for {
          rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
          log.Println("发送成功 ", index)
          index++
          time.Sleep(1 * time.Second)
       }
    
       xmtmq.RabbitMQFree(rbt)
    
    }
    

    consumer.go

    package main
    
    import (
       "log"
       "xmt/xmtmq"
    )
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.SubscribeMsg()
       xmtmq.RabbitMQFree(rbt)
    }
    

    执行的操作和上述保持一致

    终端1:go run main.go

    终端2:go run consumer.go

    终端3:go run consumer.go

    效果和上述single 模式和 work模式的明显区别是:发布订阅模式的案例,生产者生产的消息,对应的消费者消费其生产的内容

    routing 模式

    消息生产者将消息发送给交换机按照路由判断,路由是字符串 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息

    应用场景:从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等

    生产者处理流程:

    声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制定消息 -> 发送消息并指定routingkey(通配符)
    

    消费者处理流程:

    声明队列并声明交换机 -> 创建连接 -> 创建通道 -> 通道声明交换机 -> 通道声明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写消息消费方法 -> 执行消息方法
    

    目录结构如下:

    .
    ├── consumer2.go
    ├── consumer.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── xmtmq
        └── xmtmq.go
    

    xmtmq.go

    • 用到交换机 为 direct 类型

    • 用到路由键

    package xmtmq
    
    import (
       "github.com/streadway/amqp"
       "log"
    )
    
    // routing 模式
    // 定义 RabbitMQ 的数据结构
    // go get github.com/streadway/amqp
    
    type RabbitMQ struct {
       conn      *amqp.Connection // 连接
       channel   *amqp.Channel    // 通道
       QueueName string           // 队列名
       Exchange  string           // 交换机
       Key       string           // 路由键
       MQUrl     string           // MQ的虚拟机地址
    }
    
    // New 一个 RabbitMQ
    func NewRabbitMQ(rbt *RabbitMQ) {
       if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
          log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
       }
    
       conn, err := amqp.Dial(rbt.MQUrl)
       if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
       }
       rbt.conn = conn
    
       channel, err := rbt.conn.Channel()
       if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
       }
       rbt.channel = channel
    }
    
    func RabbitMQFree(rbt *RabbitMQ) {
       if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
       }
    
       rbt.channel.Close()
       rbt.conn.Close()
    }
    
    func (rbt *RabbitMQ) Init() {
       // 1、创建交换机
       err := rbt.channel.ExchangeDeclare(
          rbt.Exchange, // 交换机
          amqp.ExchangeDirect,     // 交换机类型
          true,         // 是否持久化
          false,        //是否自动删除
          false,        //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
          false,        // 是否阻塞
          nil,          // 其他属性
       )
       if err != nil {
          log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
          return
       }
    
       // 2、创建队列
       _, err = rbt.channel.QueueDeclare(
          rbt.QueueName, // 此处我们传入的是空,则是随机产生队列的名称
          true,
          false,
          false,
          false,
          nil,
       )
       if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
       }
    
       // 3、绑定队列
       err = rbt.channel.QueueBind(
          rbt.QueueName, // 队列名字
          rbt.Key,       // routing,这里key 需要填
          rbt.Exchange,  // 交换机名称
          false,         // 是否阻塞
          nil,           // 其他属性
       )
       if err != nil {
          log.Printf("rbt.channel.QueueBind error : %v", err)
          return
       }
    
    }
    
    // 生产消息 publish
    
    func (rbt *RabbitMQ) ProduceRouting(data []byte) {
    
       // 1、向队列中加入数据
       err := rbt.channel.Publish(
          rbt.Exchange, // 交换机
          rbt.Key,      // key
          false,        // 若为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
          false,        // 若为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
          amqp.Publishing{
             ContentType: "text/plain",
             Body:        data,
          },
       )
       if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
       }
    
       return
    }
    
    // 消费消息
    func (rbt *RabbitMQ) ConsumeRoutingMsg() {
    
       // 4、消费数据
       msg, err := rbt.channel.Consume(
          rbt.QueueName, // 队列名
          "",     // 消费者的名字
          true,   // 是否自动应答
          false,  // 是否排他
          false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
          false,  // 是否阻塞
          nil,    // 其他属性
       )
    
       if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
       }
    
    
       for data := range msg {
          log.Printf("received data is %v", string(data.Body))
       }
    
    }
    

    main.go

    package main
    
    import (
       "fmt"
       "log"
       "time"
       "xmt/xmtmq"
    )
    
    /*
    RabbimtMQ routing 模式 案例
    应用场景:从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景,例如处理错误,处理特定消息等
    生产消息
    */
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt1 := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt1",
          QueueName: "Routingqueuexmt1",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt1)
       rbt1.Init()
    
    
       rbt2 := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt2",
          QueueName: "Routingqueuexmt2",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt2)
       rbt2.Init()
    
    
       var index = 0
    
       for {
          rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1  %d ", index)))
          log.Println("发送成功xmt1  ", index)
    
          rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2  %d ", index)))
          log.Println("发送成功xmt2  ", index)
    
    
          index++
          time.Sleep(1 * time.Second)
       }
    
    
       xmtmq.RabbitMQFree(rbt1)
       xmtmq.RabbitMQFree(rbt2)
    
    }
    

    consumer.go

    package main
    
    import (
       "log"
       "xmt/xmtmq"
    )
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt1",
          QueueName: "Routingqueuexmt1",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.ConsumeRoutingMsg()
       xmtmq.RabbitMQFree(rbt)
    }
    

    consumer2.go

    package main
    
    import (
       "log"
       "xmt/xmtmq"
    )
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key:      "xmt2",
          QueueName: "Routingqueuexmt2",
          MQUrl:    "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.ConsumeRoutingMsg()
       xmtmq.RabbitMQFree(rbt)
    }
    

    topic 模式

    话题模式,一个消息被多个消费者获取,消息的目标 queue 可用 BindingKey 的通配符

    Topics 模式实际上是路由模式的一种

    他俩的最大的区别是 : Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的

    • *号代表可以同通配一个单词
    • 号代表可以通配零个或多个单词

    编码的案例与上述 routing 模式保持一直,只是 exchange 为 topic类型

    如下是上述几种模式涉及到的交换机队列

    image image

    rpc 模式

    RPC 远程过程调用,客户端远程调用服务端的方法 ,使用 MQ 可以实现 RPC 的异步调用

    目录结构为:

    .
    ├── consumer.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── xmtmq
        └── xmtmq.go
    
    • 客户端即是生产者也是消费者,向 RPC 请求队列发送 RPC 调用消息,同时监听RPC响应队列
    • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
    • 服务端将RPC方法 的结果发送到RPC响应队列。
    • 客户端监听RPC响应队列,接收到RPC调用结果

    xmtmq.go

    package xmtmq
    
    import (
       "github.com/streadway/amqp"
       "log"
       "math/rand"
    )
    
    // rpc 模式
    // 定义 RabbitMQ 的数据结构
    // go get github.com/streadway/amqp
    
    type RabbitMQ struct {
       conn      *amqp.Connection // 连接
       channel   *amqp.Channel    // 通道
       QueueName string           // 队列名
       Exchange  string           // 交换机
       Key       string           // 路由键
       MQUrl     string           // MQ的虚拟机地址
    }
    
    // New 一个 RabbitMQ
    func NewRabbitMQ(rbt *RabbitMQ) {
       if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
          log.Panic("please check QueueName,Exchange,MQUrl ...")
       }
    
       conn, err := amqp.Dial(rbt.MQUrl)
       if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
       }
       rbt.conn = conn
    
       channel, err := rbt.conn.Channel()
       if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
       }
       rbt.channel = channel
    }
    
    func RabbitMQFree(rbt *RabbitMQ) {
       if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
       }
       rbt.channel.Close()
       rbt.conn.Close()
    }
    
    // 生产消息
    
    func (rbt *RabbitMQ) Produce(data []byte) {
    
       // 申请队列
       q, err := rbt.channel.QueueDeclare(
          rbt.QueueName, // 队列名
          true,          // 是否持久化
          false,         // 是否自动删除
          false,         // 是否排他
          false,         // 是否阻塞
          nil,           // 其他参数
       )
       if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
       }
    
       err = rbt.channel.Qos(1, 0, false)
       if err != nil {
          log.Printf("rbt.channel.Qos error : %v", err)
          return
       }
    
       d, err := rbt.channel.Consume(
          q.Name,
          "",
          false,
          false,
          false,
          false,
          nil)
       if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
       }
    
       for msg := range d {
          log.Println("received msg is  ", string(msg.Body))
          err := rbt.channel.Publish(
             "",
             msg.ReplyTo,
             false,
             false,
             amqp.Publishing{
                ContentType:   "test/plain",
                CorrelationId: msg.CorrelationId,
                Body:          data,
             })
          if err != nil {
             log.Printf("rbt.channel.Publish error : %v", err)
             return
          }
          msg.Ack(false)
          log.Println("svr response ok ")
       }
    
       return
    }
    func randomString(l int) string {
       bytes := make([]byte, l)
       for i := 0; i < l; i++ {
          bytes[i] = byte(rand.Intn(l))
       }
       return string(bytes)
    }
    
    // 消费消息
    func (rbt *RabbitMQ) Consume() {
    
       // 申请队列
       q, err := rbt.channel.QueueDeclare(
          "",    // 队列名
          true,  // 是否持久化
          false, // 是否自动删除
          false, // 是否排他
          false, // 是否阻塞
          nil,   // 其他参数
       )
       if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
       }
    
       // 消费数据
       msg, err := rbt.channel.Consume(
          q.Name, // 队列名
          "xmt",  // 消费者的名字
          true,   // 是否自动应答
          false,  // 是否排他
          false,  // 若为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
          false,  // 是否阻塞
          nil,    // 其他属性
       )
       if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
       }
       id := randomString(32)
       err = rbt.channel.Publish(
          "",
          rbt.QueueName,
          false,
          false,
          amqp.Publishing{
             ContentType:   "test/plain",
             CorrelationId: id,
             ReplyTo:       q.Name,
             Body:          []byte("321"),
          })
       if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
       }
    
       for data := range msg {
          log.Printf("received data is %v", string(data.Body))
       }
    }
    

    main.go

    package main
    
    import (
       "fmt"
       "log"
       "xmt/xmtmq"
    )
    
    /*
    RabbimtMQ rpc 模式 案例
    应用场景:简单消息队列的使用,一个生产者一个消费者
    生产消息
    */
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
    
       rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
    
       rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))
    
    }
    

    consumer.go

    package main
    
    import (
       "log"
       "math/rand"
       "time"
       "xmt/xmtmq"
    )
    
    func main() {
    
       log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
       rand.Seed(time.Now().UTC().UnixNano())
       rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",
       }
    
       xmtmq.NewRabbitMQ(rbt)
       rbt.Consume()
    }
    

    咱们先运行消费者,多运行几个,可以看到咱们的队列中已经有数据了,咱们运行的是2个消费者,因此此处是 2

    image

    再运行生产者,就能看到生产者将消费者发送的消息消费掉,并且通过 CorrelationId 找到对应消费者监听的队列,将数据发送到队列中

    消费者监听的队列有数据了,消费者就取出来进行消费

    总结

    RabbitMQ 的六种工作模式:

    • single 模式
    • work 模式
    • publish / subscribe 模式
    • routing 模式
    • topic 模式
    • rpc 模式

    参考资料:

    RabbitMQ Tutorials

    欢迎点赞,关注,收藏

    朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

    image

    好了,本次就到这里

    技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

    我是小魔童哪吒,欢迎点赞关注收藏,下次见~

    相关文章

      网友评论

          本文标题:我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式

          本文链接:https://www.haomeiwen.com/subject/iecynltx.html