美文网首页
基于rabbitmq实现的延时队列(golang版)

基于rabbitmq实现的延时队列(golang版)

作者: _老七 | 来源:发表于2020-03-08 20:20 被阅读0次

    虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的

    实现延时队列的基本要素

    1. 存在一个倒计时机制:Time To Live(TTL)
    2. 当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)
      ~~~~~~基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
      ~~~~~~基于第二点,是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

    在这里例子当中,我使用的是 过期消息+转发指定exchange

    在 golang 中的实现

    首先是消费者comsumer.go

    package main
    
    import (
        "log"
    
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    
    func main() {
        // 建立链接
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        // 声明一个主要使用的 exchange
        err = ch.ExchangeDeclare(
            "logs",   // name
            "fanout", // type
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        // 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
        q, err := ch.QueueDeclare(
            "test_logs",    // name
            false, // durable
            false, // delete when unused
            true,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        /**
         * 注意,这里是重点!!!!!
         * 声明一个延时队列, ß我们的延时消息就是要发送到这里
         */
        _, errDelay := ch.QueueDeclare(
            "test_delay",    // name
            false, // durable
            false, // delete when unused
            true,  // exclusive
            false, // no-wait
            amqp.Table{
                // 当消息过期时把消息发送到 logs 这个 exchange
                "x-dead-letter-exchange":"logs",
            },   // arguments
        )
        failOnError(errDelay, "Failed to declare a delay_queue")
    
        err = ch.QueueBind(
            q.Name, // queue name, 这里指的是 test_logs
            "",     // routing key
            "logs", // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        // 这里监听的是 test_logs
        msgs, err := ch.Consume(
            q.Name, // queue name, 这里指的是 test_logs
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {
                log.Printf(" [x] %s", d.Body)
            }
        }()
    
        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
    }
    

    然后是生产者productor.go

    package main
    
    import (
        "log"
        "os"
        "strings"
    
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        body := bodyFrom(os.Args)
        // 将消息发送到延时队列上
        err = ch.Publish(
            "",                 // exchange 这里为空则不选择 exchange
            "test_delay",       // routing key
            false,              // mandatory
            false,              // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
                Expiration: "5000", // 设置五秒的过期时间
            })
        failOnError(err, "Failed to publish a message")
    
        log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    go run comsumer.go
    go run productor.go
    

    具体看代码和注释就行, 这里的关键点就是将要延时的消息发送到过期队列当中, 然后监听的是过期队列转发到的 exchange 下的队列 正常情况就是始终监听一个队列,然后把过期消息发送到延时队列中,当消息到达时间后就把消息发到正在监听的队列

    本文转至:https://blog.justwe.site/post/go-rabbitmq-delay-queue/

    相关文章

      网友评论

          本文标题:基于rabbitmq实现的延时队列(golang版)

          本文链接:https://www.haomeiwen.com/subject/snvjdhtx.html