美文网首页
RabbitMQ功能实现1- 红包未领取退回

RabbitMQ功能实现1- 红包未领取退回

作者: hi李昊天 | 来源:发表于2020-06-20 20:34 被阅读0次

    生产者代码:

    package main
    
    import (
        uuid "github.com/satori/go.uuid"
        "github.com/streadway/amqp"
        "github.com/wonderivan/logger"
        "rmq/db/rmq"
        "time"
    )
    
    const (
        DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
        DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列
    
        QueueName    = "queue_packet"    // 目标队列
        ExchangeName = "exchange_packet" // 目标交换机
    )
    
    var (
        ch       *amqp.Channel
        err      error
        conn     *amqp.Connection
        queue    amqp.Queue
        dlxQueue amqp.Queue
    )
    
    func main() {
    
        if conn, err = rmq.GetConn(); err != nil {
            logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
            return
        }
    
        defer conn.Close()
    
        if ch, err = conn.Channel(); err != nil {
            logger.Error("获取Channel失败:%s", err.Error())
            return
        }
    
        defer ch.Close()
    
        // 声明队列交换机
        if err = ch.ExchangeDeclare(ExchangeName, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
            logger.Error("声明业务交换机失败:%s", err.Error())
            return
        }
    
        // 创建死信交换机
        if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
            logger.Error("创建死信交换机:%s", err.Error())
            return
        }
    
        // 创建死信队列
        if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
            logger.Error("创建死信队列失败:%s", err.Error())
            return
        }
    
        // 创建业务队列
        if queue, err = ch.QueueDeclare(QueueName, true, false, false, false, amqp.Table{
            "x-message-ttl":          6000,                    // 消息过期时间 毫秒
            "x-dead-letter-exchange": DeadLettersExchangeName, // 死信交换机
            // "x-dead-letter-routing-key": "dlxKey",       // 死信路由key
        }); err != nil {
            logger.Warn("创建业务队列失败:%s", err.Error())
            return
        }
    
        // 业务队列绑定交换机
        if err = ch.QueueBind(queue.Name, "", ExchangeName, false, nil); err != nil {
            logger.Error("绑定业务交换机失败:%s", err.Error())
            return
        }
    
        // 死信队列绑定死信交换机
        if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
            logger.Error("绑定死信交换机失败:%s", err.Error())
        }
    
        for i := 1; i <= 10; i++ {
            msg := amqp.Publishing{
                MessageId:   uuid.NewV4().String(),
                ContentType: "text/plain",
                Body:        []byte("红包退回"),
            }
    
            // 发布消息
            err = ch.Publish(
                ExchangeName,
                "",
                false,
                false,
                msg,
            )
    
            if err != nil {
                logger.Error("发送失败: %s", err.Error())
                return
            } else {
                logger.Info("发送成功:%s", msg.MessageId)
            }
        }
    }
    

    消费者代码

    package main
    
    import (
        uuid "github.com/satori/go.uuid"
        "github.com/streadway/amqp"
        "github.com/wonderivan/logger"
        "rmq/db/rmq"
        "time"
    )
    
    const (
        DeadLettersExchangeName = "dlx_exchange_packet" // 死信交换机
        DeadLettersQueueName    = "dlx_queue_packet"    // 死信队列
    
        QueueName    = "queue_packet"    // 目标队列
        ExchangeName = "exchange_packet" // 目标交换机
    )
    
    var (
        ch       *amqp.Channel
        err      error
        conn     *amqp.Connection
        queue    amqp.Queue
        dlxQueue amqp.Queue
    )
    
    
    func main() {
        if conn, err = rmq.GetConn(); err != nil {
            logger.Error("连接RabbitMQ服务器失败:%s", err.Error())
            return
        }
    
        defer conn.Close()
    
        if ch, err = conn.Channel(); err != nil {
            logger.Error("获取Channel失败:%s", err.Error())
            return
        }
    
        defer ch.Close()
    
        // 创建死信交换机
        if err = ch.ExchangeDeclare(DeadLettersExchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
            logger.Error("创建死信交换机:%s", err.Error())
            return
        }
    
        // 创建死信队列
        if dlxQueue, err = ch.QueueDeclare(DeadLettersQueueName, true, false, false, false, nil); err != nil {
            logger.Error("创建死信队列失败:%s", err.Error())
            return
        }
    
        // 死信队列绑定死信交换机
        if err = ch.QueueBind(dlxQueue.Name, "", DeadLettersExchangeName, false, nil); err != nil {
            logger.Error("绑定死信交换机失败:%s", err.Error())
        }
    
        msgList, err := ch.Consume(dlxQueue.Name, "", false, false, false, false, nil)
        if err != nil {
            logger.Error("消费者监听失败:%s", err.Error())
            return
        }
    
        for {
            select {
            case message, ok := <-msgList:
                if !ok {
                    continue
                }
    
                go func(msg amqp.Delivery) {
                    logger.Info("messageID: %s", msg.MessageId)
                    logger.Info("messageBody: %s", msg.Body)
                    if err = msg.Ack(false); err != nil {
                        logger.Error("确认消息失败")
                    }
                }(message)
            case <-time.After(time.Second):
    
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ功能实现1- 红包未领取退回

          本文链接:https://www.haomeiwen.com/subject/tligxktx.html