美文网首页
RabbitMQ消费 发送

RabbitMQ消费 发送

作者: Feng_Sir | 来源:发表于2018-01-03 16:08 被阅读0次
    package connectors
    
    import (
        "../commons"
        "github.com/streadway/amqp"
        log "github.com/sirupsen/logrus"
        "bytes"
        "strconv"
        "fmt"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func rabbitConnector() (*amqp.Connection, error) {
        rabbitConfig := commons.RabbitConfig
        addr := bytes.Buffer{}
        addr.WriteString("amqp://")
        addr.WriteString(rabbitConfig.Username)
        addr.WriteString(":")
        addr.WriteString(rabbitConfig.Password)
        addr.WriteString("@")
        addr.WriteString(rabbitConfig.Host)
        addr.WriteString(":")
        addr.WriteString(strconv.Itoa(rabbitConfig.Port))
        addr.WriteString("/")
        addr.WriteString(rabbitConfig.Vhost)
        conn, err := amqp.Dial(addr.String())
        if err != nil {
            failOnError(err, "Failed to connect to RabbitMQ")
        }
        return conn, err
    }
    
    func Send(msg string) {
    
        defer func() {
            if err := recover(); err != nil {
                log.Errorf("RabbitMQ发送存储消息错误 %s", err)
            }
        }()
        rabbitConfig := commons.RabbitConfig
        conn, err := rabbitConnector()
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        err = ch.ExchangeDeclare(
            rabbitConfig.Exchange, // name
            "topic",              // type
            true,                  // durable
            false,                 // auto-deleted
            false,                 // internal
            false,                 // no-wait
            nil,                   // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        err = ch.Publish(
            rabbitConfig.Exchange, // exchange
            "custom",              // routing key
            false,                 // mandatory
            false,                 // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msg),
            })
        failOnError(err, "Failed to publish a message")
    
    }
    
    func RabbitConsume() {
        conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/dashboard", commons.RabbitConfig.Username, commons.RabbitConfig.Password, commons.RabbitConfig.Host, commons.RabbitConfig.Port))
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        err = ch.ExchangeDeclare(
            "dashboard", // name
            "topic",     // type
            true,        // durable
            false,       // auto-deleted
            false,       // internal
            false,       // no-wait
            nil,         // arguments
        )
        failOnError(err, "Failed to declare an exchange")
    
        q, err := ch.QueueDeclare(
            "", // name
            true,        // durable
            false,        // delete when usused
            false,        // exclusive
            false,        // no-wait
            nil,          // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        err = ch.QueueBind(
            q.Name,      // queue name
            "custom",    // routing key
            "dashboard", // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {
                log.Printf(" [x] %s", d.Body)
            }
        }()
    
        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
    }
    
    

    相关文章

      网友评论

          本文标题:RabbitMQ消费 发送

          本文链接:https://www.haomeiwen.com/subject/kyohnxtx.html