美文网首页
什么是死信队列?如何实现消息的幂等性?拿来吧你!

什么是死信队列?如何实现消息的幂等性?拿来吧你!

作者: 张清柏 | 来源:发表于2021-07-27 21:56 被阅读0次

    死信队列

    • 死信队列:DLX,dead-letter-exchange,一般是由于消息被否定了,消息过期了,或者消息队列超过最大长度导致信息不能被正常消费,那么这条消息就成了死信消息,如果我们绑定了死信队列,那么这个消息就会被投递到死信队列

    • 使用场景:
      使用rabbitmq的死信队列完成对库存、题库的回收工作,比如 某个商品被下单了减了库存,但是迟迟没有付款,超过30分钟我们就默认订单取消,并恢复库存。或者在医生抢题 答题的业务中,有的医生抢了10道题,但是只在有效期(比如1天)内答了3道题,但是剩余的7道题不可能一直被这个医生绑定,超过1天就要被解绑回库。

    • 先说明以下内容:
      1.队列可以在producer里面声明创建和绑定,也可以在consumer里面声明创建和绑定。declare并不关系和影响你的逻辑
      2.如果你的死信队列想使用fanout ,在绑定的时候不要绑定key 即可,即key为空,如果你想使用direct,则必须指定key,其他类型也是如此。

    我们先来看消费的代码

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
    )
    
    func main() {
        //声明交换机和消费的队列和key
        var exchange = "direct_guofu_exchange"
        var queue = "direct_guofu_queue_dlx_key"
        var key = "direct_key"
        
        //声明死信队列的交换机和路由键
        var dlxExchange = "dlx_exchange"//死信队列交换机
        var dlxKey = "dlx_key"//死信队列路由键
    
        //建立连接  用户名+密码+ip+端口号+vhost
        conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
        //建立通道
        ch, _ := conn.Channel()
        //试探性声明交换机类型
        ch.ExchangeDeclare(
            exchange,
            "direct",
            true,
            false,
            false,
            false,
            nil,
        )
    
        //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
        _, err := ch.QueueDeclare(
            queue,
            true,
            false,
            false,
            false,
            //此处绑定死信交换机和路由键,不指定路由键默认是fanout模式
            amqp.Table{
                "x-dead-letter-exchange":dlxExchange,//交换机
                "x-message-ttl":6000,//消息过期时间
                "x-dead-letter-routing-key":dlxKey,//绑定死信队列的key
            },
        )
        if err != nil {
            panic(err)
        }
        //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
        ch.QueueBind(queue, key, exchange, false, nil)
        //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
        msg, err := ch.Consume(
            queue,
            "",
            false,
            false,
            false,
            false,
            nil,
        )
        for d := range msg {
            fmt.Println(string(d.Body))
            d.Ack(false)
    
        }
    
    }
    
    
    
    • 我们再来看消费死信队列的代码,这里面就是普通消费队列的代码,没有什么特殊
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
    )
    
    func main() {
        var dlxExchange = "dlx_exchange"//死信队列交换机
        var dlxKey = "dlx_key"//死信队列交换机
        var dxlxQueue="dlx_queue"
        //建立连接  用户名+密码+ip+端口号+vhost
        conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
        //建立通道
        ch, _ := conn.Channel()
        //试探性声明交换机类型
        ch.ExchangeDeclare(
            dlxExchange,
            "direct",
            true,
            false,
            false,
            false,
            nil,
        )
    
        //试探性创建队列
        //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
        _, err := ch.QueueDeclare(
            dxlxQueue,
            true,
            false,
            false,
            false,
            nil,
        )
        if err != nil {
            panic(err)
        }
        //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
        ch.QueueBind(dxlxQueue, dlxKey, dlxExchange, false, nil)
    
        //选择消费死信队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
        msg, err := ch.Consume(
            dxlxQueue,
            "",
            false,
            false,
            false,
            false,
            nil,
        )
        for d := range msg {
            fmt.Println(string(d.Body))
            d.Ack(false)
    
        }
    
    }
    
    
    • 生产代码,也是一段普通的生产代码
    package main
    
    import (
        "github.com/streadway/amqp"
    )
    
    /**
     * @Description: 演示死信队列,
     */
    func main() {
        var exchange = "direct_guofu_exchange"
        var key = "direct_key"
        //建立连接  用户名+密码+ip+端口号+vhost
        conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
        //建立通道
        ch, _ := conn.Channel()
        //声明交换机类型
        ch.ExchangeDeclare(
            exchange,
            "direct",
            true,
            false,
            false,
            false,
            nil,
        )
    
        //定义消息
        msgBody := "i am a dead_letter"
        //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
        err := ch.Publish(
            exchange, //exchange
            key,      //routing key(queue name)
            false,
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent, //Msg set as persistent
                ContentType:  "text/plain",
                Body:         []byte(msgBody),
            })
    
        if err != nil {
            panic(err)
        }
    }
    
    
    • 我们暂停队列的正常消费,看看6秒后能否进入到死信队列


      image.png
    • 从上图可以看到,死信消息在6s后被投递到死信队列

    消息的幂等性参考

    相关文章

      网友评论

          本文标题:什么是死信队列?如何实现消息的幂等性?拿来吧你!

          本文链接:https://www.haomeiwen.com/subject/xxznmltx.html