美文网首页
Golang RabbitMQ发布订阅模式(广播模式、fanou

Golang RabbitMQ发布订阅模式(广播模式、fanou

作者: 一位先生_ | 来源:发表于2022-06-14 09:49 被阅读0次

    golang RabbitMQ发布订阅模式(广播模式、fanout模式),就是一个生产者发送的消息会被多个消费者处理。

    image.png

    说明:

    • P 代表生产者 , C1、C2 代表消费者,红色代表队列, X代表交换机(Exchange)。
    • 交换机(Exchange)负责将消息转发至绑定交换机的所有队列。
    • 可以定义多个队列,分别绑定同一个交换机。
    • 每个队列可以有一个或者多个消费者。

    1.安装依赖包

    go get github.com/streadway/amqp
    
    

    2.发送消息

    下面分步骤演示消息生产者如何发送消息

    2.1. 连接RabbitMQ Server

    // 连接RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    defer conn.Close()
    
    

    连接地址说明:

    amqp://账号:密码@RabbitMQ地址:端口/
    
    

    2.2. 创建Channel

    大部分操作都是在Channel(信道 )完成的。

    ch, err := conn.Channel()
    defer ch.Close()
    
    

    2.3. 声明交换机

    消息先发送到交换机(Exchange),由交换机根据策略转发消息到队列。

    err = ch.ExchangeDeclare(
            "tizi365",   // 交换机名字
            "fanout", // 交换机类型,这里使用fanout类型,即: 发布订阅模式
            true,     // 是否持久化
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
    
    

    2.4. 推送消息

    // 消息内容
    body := "Hello Tizi365.com!"
    
    // 推送消息
    err = ch.Publish(
      "tizi365",     // exchange(交换机名字,跟前面声明对应)
      "", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
      false,  // mandatory
      false,  // immediate
      amqp.Publishing {
        ContentType: "text/plain", // 消息内容类型,这里是普通文本
        Body:        []byte(body),  // 消息内容
      })
    
    

    2.5.完整的消息推送代码

    package main
    
    import (
        "log"
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    func main() {
        // 连接rabbitmq
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        // 创建信道
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 声明交换机
        err = ch.ExchangeDeclare(
            "tizi365",   // 交换机名字
            "fanout", // 交换机类型,fanout发布订阅模式
            true,     // 是否持久化
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        // 消息内容
        body := "Hello Tizi365.com!"
        // 推送消息
        err = ch.Publish(
            "tizi365",     // exchange(交换机名字,跟前面声明对应)
            "", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain", // 消息内容类型,这里是普通文本
                Body:        []byte(body),  // 消息内容
            })
    
        log.Printf("发送内容 %s", body)
    }
    
    

    3.接收消息

    接收消息前面三个步骤:连接RabbitMQ、创建信道、声明交换机跟发送消息一样,参考前面2.1、2.2、2.3章节即可。

    3.1.声明队列

    声明需要操作的队列

    q, err := ch.QueueDeclare(
            "",    // 队列名字,不填则随机生成一个
            false, // 是否持久化队列
            false, // delete when unused
            true,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
    
    

    3.2.队列绑定交换机

    队列需要绑定到交换机才能接收到消息

    err = ch.QueueBind(
            q.Name, // 队列名
            "",     // 路由参数,fanout类型交换机,自动忽略路由参数
            "tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
            false,
            nil)
    
    

    提示:实际应用中,我们可以定义N个队列,分别绑定到同一个交换机上,就可以接收交换机转发过来的消息,这就是发布订阅模式体现的地方。

    3.3.创建消费者

    msgs, err := ch.Consume(
            q.Name, // 引用前面的队列名
            "",     // 消费者名字,不填自动生成一个
            true,   // 自动向队列确认消息已经处理
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
    
    // 循环处理消息
    for d := range msgs {
                log.Printf("接收消息=%s", d.Body)
            }
    
    

    3.4.完整消费者代码

    package main
    
    import (
        "log"
    
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    func main() {
        // 连接rabbitmq
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        // 创建信道,通常一个消费者一个
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 声明交换机
        err = ch.ExchangeDeclare(
            "tizi365",   // 交换机名,需要跟消息发送方保持一致
            "fanout", // 交换机类型
            true,     // 是否持久化
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        // 声明需要操作的队列
        q, err := ch.QueueDeclare(
            "",    // 队列名字,不填则随机生成一个
            false, // 是否持久化队列
            false, // delete when unused
            true,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        // 队列绑定指定的交换机
        err = ch.QueueBind(
            q.Name, // 队列名
            "",     // 路由参数,fanout类型交换机,自动忽略路由参数
            "tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        // 创建消费者
        msgs, err := ch.Consume(
            q.Name, // 引用前面的队列名
            "",     // 消费者名字,不填自动生成一个
            true,   // 自动向队列确认消息已经处理
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        // 循环消费队列中的消息
        for d := range msgs {
            log.Printf("接收消息=%s", d.Body)
        }
    }
    
    

    3.5.多个消费者

    相关文章

      网友评论

          本文标题:Golang RabbitMQ发布订阅模式(广播模式、fanou

          本文链接:https://www.haomeiwen.com/subject/lmifsrtx.html