美文网首页
基于Go的Rabbitmq实践

基于Go的Rabbitmq实践

作者: 正在修炼的西瓜君 | 来源:发表于2019-04-16 15:49 被阅读0次

    学会使用消息队列是后端程序员进阶的必备技能之一,消息队列可以异步处理请求,缓解系统的压力,从而达到解耦、削峰等目的,大大提高系统的可用性以及扩展性。

    Rabbitmq是使用Erlang语言实现AMQP协议的消息中间件,具有易用、高扩展、高可用、持久化等方面特点,由于成熟优秀的表现和拥有活跃的文档跟社区,Rabbitmq成为很多人开发消息队列的首选。

    环境安装

    参考https://blog.csdn.net/u013219624/article/details/83412925

    1、安装erlang

    由于是基于Erlang开发的,所以必须先安装Erlang环境,从官网可以看到安装源码包,使用wget下载的话会很慢,推荐使用三方下载器下载,然后通过ssh工具传到服务器上,我的软件下载目录一般是 /usr/local/,这里就安装在 /usr/local/erlang。

    然后解压安装包,安装。

    # wget http://erlang.org/download/otp_src_21.3.tar.gz
    # tar -xvzf otp_src_21.3.tar.gz
    # cd otp_src_21.3
    # ./configure --prefix=/usr/local/erlang --without-javac
    # make && make install
    

    验证一下是否安装成功,切换到 /usr/local/erlang目录,执行下方指令即可看到。

    # bin/erl
    

    别忘了配置环境变量,打开 /etc/profile 进行如下编辑

    export ERLPATH=/usr/local/erlang
    export PATH=$PATH:$ERLPATH/bin
    

    修改好之后重新生效配置

    # source /etc/profile
    

    现在可以直接用 # erl 指令运行erlang了。

    2.安装Rabbitmq

    Rabitmq安装比较省力,从官方提供的下载地址中下载rpm安装包,直接安装即可。

    # wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
    # yum -y install rabbitmq-server-3.6.15-1.el7.noarch.rpm
    

    查看是否安装成功,首先启动服务,然后查看服务状态。

    # service rabbitmq-server start
    # service rabbitmq-server status
    

    以下是Rabbitmq其它常用指令:

    // 停止服务
    # service rabbitmq-server stop
    // 重启服务
    # service rabbitmq-server restart
    // 设置开机启动
    # chkconfig rabbitmq-server on
    // 启动web管理插件(127.0.0.1:15672)
    # rabbitmq-plugins enable rabbitmq_management
    // 查看用户列表
    # rabbitmqctl list_users
    // 添加用户
    # rabbitmqctl add_user admin 123456
    // 设置用户角色
    # rabbitmqctl set_user_tags admin administrator
    // 设置权限
    # rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    // 删除用户
    # rabbitmqctl delete_user admin
    // 修改用户密码
    # rabbitmqctl change_password admin 123456
    

    默认的用户密码是 guest guest,但只限本地访问,远程访问的话必须先创建一个新用户,指定为管理员角色,分配管理员权限,就可以登录管理后台进行操作了。

    下图就是登录后的管理界面,这上面可以看到节点、端口、交换机、队列等信息,还可以手动发送消息,管理消息,十分方便。

    名词解释

    1、基础模块

    要熟练使用Rabbitmq就必须知道其名词含义和工作机制

    基本流程

    基本的工作流程是这样的:生产者,就是你的发送程序,通过TCP连接,创建channel(通道)向指定的exchange(交换机)发送一个消息,exchange再将消息下发到binding(绑定)的queue(队列)中,然后消费者(处理程序)监听接收queue中的消息进行处理。

    这是google的一张流程图

    生产者,消费者

    即发送消息和接收处理消息的逻辑程序

    Channel

    通道,rabbitmq的本质是tcp通信,利用tcp连接创建内部的逻辑连接,注意,此通道不是tcp本身通道(tcp一个连接就是一个通道),而是共享一个tcp连接的其内部实现的连接,至于rabbitmq内部如何实现的我也没吃透,应该是用到了多路复用,总之rabbitmq一切收发都是通过channel实现的,避免了重复连接tcp产生的资源消耗。

    Exchange

    交换机,相当于是一个消息中转控制中心,负责接收消息然后根据路由规则将消息下发到指定的queue。

    Queue

    队列,即存放消息的地方,消费的时候直接从队列里取。

    2、参数说明

    Routing Key

    路由键,是exchange跟queue之间的桥梁,exchange根据绑定的routing key下发消息到对应的queue中,决定了消息的流向,键名可以自定义。

    Type

    exchange的类型,有'fanout'、'direct'、'topic'、'headers'四个类型。

    • fanout:不需要指定路由键,直接将消息发送给exchange中的所有queue,类似于广播。
    • direct:将消息发给exchange中指定路由键的queue中,相当于精准投放。
    • topic:匹配模式,消息下发到匹配规则的routing key的queue中,有'*'与'#'两个通配符,'*'表示只匹配一个词,'#'表示匹配多个,比如'user.*'只能匹配到'user.name'而不能匹配到'user.name.wang','user.#'则都可以匹配到。
    • headers:根据消息体的headers匹配,这种用到的比较少,绑定的时候指定相关header参数即可。

    Durable

    exchange跟queue都有这个参数,类型为boolean,表示是否持久化。

    Auto delete

    exchange跟queue都有这个参数,类型为boolean,我试了一下,当exchange绑定的queue全都解绑的时候exchange会自动删除,queue好像没什么影响。

    Internal

    exchange有这个参数,类型为boolean,内部的,意味着不能对这个exchange发送消息,通过管理后台还是可以发送消息的。

    noWait

    几乎每个步骤都有这个参数,类型为boolean,不需要服务器任何返回值的意思,指服务端创建队列发送消息等,rabbitmq不需要这个返回状态即可进行下一步,正常来说不会用到这个参数,容易报异常。

    Exclusive

    queue有这个参数,类型为boolean,排他队列,只对创建该队列的用户可见,其它用户无法访问。

    延伸扩展

    rabbitmq还提供了很多扩展参数,比如'x-message-ttl'给消息设置过时时间,'x-max-length-bytes'设置消息最大长度,'x-dead-letter-exchange'设置消息过时后推送到的exchange等等,具体的官方文档也提供了,也可以看管理后台创建exchange、queue的时候会有提示的额外参数。

    编程实践

    使用Go语言操作Rabbitmq需要用到这个库:https://github.com/streadway/amqp,这是一个带Rabbitmq扩展的AMQP客户端。

    下面就一步步解析下基于go的rabbitmq收发过程。

    1、建立连接

    上面已经说过了,其本质是tcp链接,并且是基于内部通道进行的通信,所以一个完整的连接分为连接与创建通道两部分。

    连接地址的格式是这种形式:amqp://admin:123456@127.0.0.1:5672/

    // 建立连接
    connection, err := amqp.Dial(uri)
    if err != nil {
        log.Println("Failed to connect to RabbitMQ:", err.Error())
        return err
    }
    defer connection.Close()
    // 创建一个Channel
    channel, err := connection.Channel()
    if err != nil {
        log.Println("Failed to open a channel:", err.Error())
        return err
    }
    defer channel.Close()
    
    2、声明exchange

    首先声明需要发送到的exchange,如果此exchange不存在将会被自动创建。

    // 声明 exchange
    if err := channel.ExchangeDeclare(
        exchange, //name
        "direct", //exchangeType
        true,     //durable
        false,    //auto-deleted
        false,    //internal
        false,    //noWait
        nil,      //arguments
    ); err != nil {
        log.Println("Failed to declare a exchange:", err.Error())
        return err
    }
    

    这里声明的exchange类型为'direct',精准投放模式,持久化,这也是最常用的配置。

    3、声明queue

    同样,queue也需要先声明,不存在的也会被自动创建。

    // 声明一个queue
    if _, err := channel.QueueDeclare(
        queue, // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    ); err != nil {
        log.Println("Failed to declare a queue:", err.Error())
        return err
    }
    

    此队列为持久化队列,以上也是最常用的配置。

    4、绑定queue

    创建好exchange和queue之后,需要建立两者的联系,即绑定,第二个参数就是指定的 routing_key。

    // exchange 绑定 queue
    channel.QueueBind(queue, routing_key, exchange, false, nil)
    
    5、发送消息

    万事俱备,只剩发送了,指定需要发送的exchange跟'routing_key',设置好发送的消息体就可以将消息发送出去了,为什么不用指定queue呢,因为消息是先投放到exchange的,exchange会自动根据绑定的规则将消息下发到对应的queue。

    // 发送
    messageBody := comhelper.JsonEncode(content)
    if err = channel.Publish(
        exchange,    // exchange
        routing_key, // routing key
        false,       // mandatory
        false,       // immediate
        amqp.Publishing{
            Headers:         amqp.Table{},
            ContentType:     "text/plain",
            ContentEncoding: "",
            Body:            []byte(messageBody),
            //Expiration:      "60000", // 消息过期时间
        },
    ); err != nil {
        log.Println("Failed to publish a message:", err.Error())
        return err
    }
    

    消息体的设置可以去程序中看'Publishing'的定义,有优先级、过期时间等诸多设置。

    type Publishing struct {
        // Application or exchange specific fields,
        // the headers exchange will inspect this field.
        Headers Table
    
        // Properties
        ContentType     string    // MIME content type
        ContentEncoding string    // MIME content encoding
        DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
        Priority        uint8     // 0 to 9
        CorrelationId   string    // correlation identifier
        ReplyTo         string    // address to to reply to (ex: RPC)
        Expiration      string    // message expiration spec
        MessageId       string    // message identifier
        Timestamp       time.Time // message timestamp
        Type            string    // message type name
        UserId          string    // creating user id - ex: "guest"
        AppId           string    // creating application id
    
        // The application specific payload of the message
        Body []byte
    }
    

    运行程序,发送一条消息,我们可以通过管理后台看到发送的消息,这时候登录管理控制台,会发现exchange、queue以及绑定关系都被自动创建了。

    来到'Queues'面板,找到下面的'Get messages',点击按钮就能看到消息了。

    6、消费消息

    消息的消费也是从连接并且创建通道开始的,不过消费者不需要声明exchange,因为它是直接从queue中取消息的,所以只声明一个queue即可,注意配置需要跟生产者一样。

    其核心代码是注册消费者。

    // 注册消费者
    msgs, err := ch.Consume(
        q.Name, // queue
        "project", // 标签
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Println("Failed to register a consumer:", err.Error())
        return err
    }
    

    同样我们也可以在管理控制台的当前queue面板中看到'Consumers'信息。

    7、完整代码

    生产者

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "helper_go/comhelper"
        "log"
    )
    
    func main() {
        uri := "amqp://admin:123456@127.0.0.1:5672/"
        exchange := "project"
        queue := "pj_event"
        routing_key := "pj_event"
        content := map[string]interface{}{
            "name": "zelda",
        }
    
        err := Pub_mq(uri, exchange, queue, routing_key, content)
        fmt.Println(err)
    }
    
    // 生产者
    func Pub_mq(uri, exchange, queue, routing_key string, content map[string]interface{}) error {
        // 建立连接
        connection, err := amqp.Dial(uri)
        if err != nil {
            log.Println("Failed to connect to RabbitMQ:", err.Error())
            return err
        }
        defer connection.Close()
        // 创建一个Channel
        channel, err := connection.Channel()
        if err != nil {
            log.Println("Failed to open a channel:", err.Error())
            return err
        }
        defer channel.Close()
    
        // 声明exchange
        if err := channel.ExchangeDeclare(
            exchange, //name
            "direct", //exchangeType
            true,     //durable
            false,    //auto-deleted
            false,    //internal
            false,    //noWait
            nil,      //arguments
        ); err != nil {
            log.Println("Failed to declare a exchange:", err.Error())
            return err
        }
        // 声明一个queue
        if _, err := channel.QueueDeclare(
            queue, // name
            true,  // durable
            false, // delete when unused
            false, // exclusive
            false, // no-wait
            nil,   // arguments
        ); err != nil {
            log.Println("Failed to declare a queue:", err.Error())
            return err
        }
        // exchange 绑定 queue
        channel.QueueBind(queue, routing_key, exchange, false, nil)
    
        // 发送
        messageBody := comhelper.JsonEncode(content)
        if err = channel.Publish(
            exchange,    // exchange
            routing_key, // routing key
            false,       // mandatory
            false,       // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(messageBody),
                //Expiration:      "60000", // 消息过期时间
            },
        ); err != nil {
            log.Println("Failed to publish a message:", err.Error())
            return err
        }
        return nil
    }
    
    

    消费者

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
    )
    
    func main() {
        uri := "amqp://admin:123456@127.0.0.1:5672/"
        exchange := "project"
        queue := "pj_event"
    
        err := Use_mq(uri, exchange, queue)
        fmt.Println(err)
    }
    
    // 消费者
    func Use_mq(uri, exchange, queue string) error {
        // 建立连接
        conn, err := amqp.Dial(uri)
        if err != nil {
            log.Println("Failed to connect to RabbitMQ:", err.Error())
            return err
        }
        defer conn.Close()
        // 启动一个通道
        ch, err := conn.Channel()
        if err != nil {
            log.Println("Failed to open a channel:", err.Error())
            return err
        }
    
        // 声明一个队列
        q, err := ch.QueueDeclare(
            queue, // name
            true,  // durable
            false, // delete when usused
            false, // exclusive
            false, // no-wait
            nil,   // arguments
        )
        if err != nil {
            log.Println("Failed to declare a queue:", err.Error())
            return err
        }
        // 注册消费者
        msgs, err := ch.Consume(
            q.Name,    // queue
            "project", // 标签
            true,      // auto-ack
            false,     // exclusive
            false,     // no-local
            false,     // no-wait
            nil,       // args
        )
        if err != nil {
            log.Println("Failed to register a consumer:", err.Error())
            return err
        }
    
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Println(d.Type)
                log.Println(d.MessageId)
                log.Printf("Received a message: %s", d.Body)
            }
        }()
        log.Printf("Waiting for messages. To exit press CTRL+C")
        <-forever
    
        return nil
    }
    
    

    高级应用

    现在我们有一个需求,要求给消息设置过期时间,过期的消息放到另一个队列进行额外的处理,这个队列就叫死信队列。

    先来看消息过期时间的设置,有两种方法,一种是声明队列的时候设置x-message-ttl参数,这样这个队列中的消息都会有一个过期时间;还有一种就是发送消息的时候单独给这条消息设置过期时间,即Expiration参数,如果两个参数都设置了,那么以时间短的那个为准。

    来做个准备工作,先删除之前创建的'pj_event'队列,然后再控制台手动创建一个名为'dead'的exchange,和一个名为'de_event'的队列,将两者绑定。

    然后改造生产者声明queue的代码,如下:

    // 声明一个queue
    args := amqp.Table{
        "x-message-ttl":             int64(60000),
        "x-dead-letter-exchange":    "dead",
        "x-dead-letter-routing-key": "",
    }
    if _, err := channel.QueueDeclare(
        queue, // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        args,  // arguments
    ); err != nil {
        log.Println("Failed to declare a queue:", err.Error())
        return err
    }
    

    其中x-message-ttl参数设置过期时间,必须是int64格式,x-dead-letter-exchange设置消息过期后下发的交换机,x-dead-letter-routing-key参数必须指定,如果私信队列绑定的时候没有routing key为空就好。

    运行代码,从控制台可以看到两条队列的消息情况。

    可以看到'pj_event'的消息条数为1,'de_event'死信队列的消息为0,静待一分钟之后再次观察。

    这时候'pj_event'中已经没有消息了,而'de_event'中多出了一条消息,点进'de_event',查看具体的消息内容。

    可以看到消息的内容以及来源。

    结语

    Rabbitmq的介绍告一段落,通过本教程相信应该可以领大家入门了,更多的功能我也在研究中,总之,学会使用队列是后端程序员进阶的必备知识,可一定要掌握呀。

    相关文章

      网友评论

          本文标题:基于Go的Rabbitmq实践

          本文链接:https://www.haomeiwen.com/subject/xnxoiqtx.html