美文网首页RabbitMQGo
RabbitMQ入门学习3 - Go Package实现

RabbitMQ入门学习3 - Go Package实现

作者: 红薯爱帅 | 来源:发表于2021-05-02 10:26 被阅读0次

    1. 概述

    RabbitMQ在生产环境的应用Tips,基于Golang。

    go with rabbitmq

    2. 基础功能

    基础功能包含Exchange、Queue的Declare,如何Consume和Publish,以及对Delivery的操作(Delivery.Ack, Delivery.Reject or Delivery.Nack)。

    参考:https://github.com/zspishere/rabbitmq-demo

    这里增加两个必要的RabbitMQ Error Catch,可以实现重连或Queue重建等操作。

    Catch Connection Error

    import "github.com/streadway/amqp"
    
    chConnErr chan *amqp.Error
    chConnErr = make(chan *amqp.Error)
    
    amqpConn.NotifyClose(chConnErr)
    

    Catch Channel Error

    import "github.com/streadway/amqp"
    
    chNotifyClose       chan *amqp.Error
    chNotifyReturn      chan amqp.Return
    chNotifyCancel      chan string
    
    chNotifyClose:       make(chan *amqp.Error)
    chNotifyReturn:      make(chan amqp.Return)
    chNotifyCancel:      make(chan string)
    
    ch.NotifyClose(c.chNotifyClose)
    ch.NotifyReturn(c.chNotifyReturn)
    ch.NotifyCancel(c.chNotifyCancel)
    

    3. 其他功能

    3.1. 消息持久化

    • Publish a Persistent Pulishing
    func publishToQueue(pub *Publishment) error {
        message, err := json.Marshal(pub.Payload)
        if err != nil {
            msg := "Failed to publish a message in json.marshal"
            log.Printf("%s: %#v - %s", msg, pub, err)
            return err
        }
    
        err = producerCh.amqpCh.Publish(
            pub.Exchange,
            pub.RoutingKey,
            false,
            false,
            amqp.Publishing{
                ContentType:  "text/plain",
                Body:         message,
                DeliveryMode: amqp.Persistent,
                //Expiration: "5000",
            })
        if err != nil {
            msg := "Failed to publish a message"
            log.Printf("%s: %s", msg, err)
            return err
        }
        //log.Printf("Publish message: %#v", pub)
        return nil
    }
    

    https://www.cnblogs.com/davenkin/p/rabbitmq-best-practices.html

    • Declare a durable Exchange and Queue

    Durable and Non-Auto-Deleted queues will survive server restarts and remain
    when there are no remaining consumers or bindings. Persistent publishings will
    be restored in this queue on server restart. These queues are only able to be
    bound to durable exchanges.

    Make sure durable=true.

    amqpCh.ExchangeDeclare(c.exchange, "topic", true, false, false, false, nil)
    amqpCh.QueueDeclare(c.queue, true, false, false, false, args)
    

    3.2. Lazy Queue

    惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
    https://blog.csdn.net/u013256816/article/details/77987216

    amqpCh.QueueDeclare时,增加args参数。

    args := amqp.Table{}
    args["x-queue-mode"] = "lazy"
    

    3.3. Queue TTL

    RabbitMQ支持Message TTL和Queue TTL,可以先给Queue设置一个Default TTL。
    https://www.rabbitmq.com/ttl.html

    amqpCh.QueueDeclare时,增加args参数。

    args := amqp.Table{}
    args["x-message-ttl"] = 7 * 86400 * 1000 // ms
    

    3.4. 死信队列

    三种情况的message会被resend到死信exchange,基于死信Exchange,可以建立死信队列。

    • 消息被拒绝(basic.rejcet/basic.nack)且requeue=false
    • 消息TTL过期
    • 队列达到最大长度

    amqpCh.QueueDeclare时,增加args参数。

    args := amqp.Table{}
    args["x-dead-letter-exchange"] = rabbitmqdlx
    

    3.5. 限流

    // 每次只推1条
    ch.Qos(1, 0, false)
    // 设置Channel autoAck一定要设置为false,才能做限流
    amqpCh.Consume(c.queue, c.tag, false, false, false, false, nil)
    

    3.6. 消费者保证消息幂等操作

    https://blog.csdn.net/eluanshi12/article/details/88959856

    3.7. 数据安全相关

    • Vhost
    • 账号密码
    • TLS证书

    4. 坑s

    坑1,amqp.channel.Close()

    • amqp.channel.Close()好像有bug,释放资源不干净,反复重建channel会报错,有空写个test程序试一下

    坑2,Queue和Xchange Declare参数需要一致(除了nowait字段)

    • 问题:QueueDeclare的参数要一致,否则报错,且retry会被block住
    • 解决:如果要改变参数,先delete,再declare。是否可删除,需要注意

    坑3,对Delivery有三种操作,可能存在重复推送,幂等需要应用系统来保证

    • func (d Delivery) Ack(multiple bool) error
    • func (d Delivery) Reject(requeue bool) error
    • func (d Delivery) Nack(multiple, requeue bool) error

    如果Reject(true),会重复推送同一条数据给当前ch的当前consumer,一直一直。。

    更多使用规范

    https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html

    5. Next

    Publishing Ack

    RabbitMQ Cluster

    Quorum-queues

    相关文章

      网友评论

        本文标题:RabbitMQ入门学习3 - Go Package实现

        本文链接:https://www.haomeiwen.com/subject/dinvrltx.html