美文网首页
封装 golang 使用路由模式调用 rabbitmq

封装 golang 使用路由模式调用 rabbitmq

作者: dwq1666666 | 来源:发表于2021-04-30 16:50 被阅读0次

    参考文档地址:
    http://rabbitmq.mr-ping.com/tutorials_with_golang/[6]RPC.html

    抽出来的包 rabbitmq.go

    
    import (
        "github.com/streadway/amqp"
        "log"
        "sync"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    var pool  = sync.Pool{
        New: func() interface{} {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
                failOnError(err, "Failed to connect to RabbitMQ")
                return conn
            },
    }
    
    type MessageQueue struct {
        conn *amqp.Connection // 持有连接对象
        ch *amqp.Channel      // 持有channel 对象
        ExchangeName string   // 持有交换机的名称
        QueueName string      // 接受消息的队列名称
    }
    
    // 获取到交换机
    func (mq *MessageQueue) getExchange(exchange string)   {
    
        ch, err := mq.conn.Channel()
        failOnError(err, "Failed to open a channel")
        mq.ch = ch
    
        err = ch.ExchangeDeclare(
            exchange, // name
            "direct",      // type
            true,          // durable 持久化消息
            false,         // auto-deleted
            false,         // internal
            false,         // no-wait
            nil,           // arguments
        )
    
        failOnError(err, "Failed to declare an exchange")
    }
    
    // 发送消息
    func (mq *MessageQueue)SendMessage(msgType, msg string)  {
        mq.conn = pool.Get().(* amqp.Connection)
    
        // 获取到交换机
        mq.getExchange(mq.ExchangeName)
    
        // 推送消息
        err := mq.ch.Publish(
            mq.ExchangeName,         // exchange
            msgType,        // routing key
            false, // mandatory
            false, // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msg),
            })
        failOnError(err, "Failed to publish a message")
    }
    
    // 关闭资源
    func (mq *MessageQueue)Close(){
        pool.Put(mq.conn) // 连接放回
        mq.ch.Close()
    }
    
    // 获取到消息
    func (mq *MessageQueue)GetMessage(msgType string) <-chan string {
        mq.conn = pool.Get().(* amqp.Connection)
    
        // 获取到交换机
        mq.getExchange(mq.ExchangeName)
    
        // 存储 临时交换队列
        q, err := mq.ch.QueueDeclare(
            mq.QueueName,    // name
            true, // durable
            false, // delete when usused
            false,  // exclusive
            false, // no-wait
            nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        err = mq.ch.QueueBind(
            q.Name,        // queue name
            msgType,             // routing key
            mq.ExchangeName, // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    
        // 设置逐个消费消息
        err = mq.ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, // global
        )
        failOnError(err, "Failed to set QoS")
    
        msgs, err := mq.ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,   // auto ack
            false,  // exclusive
            false,  // no local
            false,  // no wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        ret := make(chan string)
    
        println("进来循环处理消息")
        go func() {
            for {
                select {
                case d := <-msgs:
                    mess := string(d.Body)
                    println("获得的消息:" + mess)
                    ret <- mess
                    d.Ack(false) // 标记消息被消费掉了
                }
            }
        }()
    
        return ret
    }
    

    客户端

    func main()  {
        args := os.Args
        message := rabbit.MessageQueue{ExchangeName:"messageQueue"}
        defer message.Close()
    
        message.SendMessage(args[1],args[2])
    
    }
    

    服务端

    func main()  {
        // 接受消息并处理
        message := rabbit.MessageQueue{ExchangeName:"messageQueue",QueueName:"messageQueueData"}
        defer message.Close()
    
        msg := message.GetMessage(os.Args[1])
        sigs := make(chan os.Signal, 1)
    
        // 接受用户中断和,ctrl+c 和 用户中断
        signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
    
        ms := make([]string,0)
    
        for {
            select {
            case m := <-msg:
                println(" 开始处理消息: ")
                println(m)
    
                ms = append(ms, m)
    
                time.Sleep( time.Second * 10 )
                println(" 消息处理完毕: ")
            case sig := <-sigs:
                ioutil.WriteFile("a.txt", []byte(strings.Join(ms,"\n")) , os.ModePerm)
                fmt.Println("%v", sig)
                return
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:封装 golang 使用路由模式调用 rabbitmq

          本文链接:https://www.haomeiwen.com/subject/elanrltx.html