    docker pull rabbitmq:management


    方式一:默认guest 用户,密码也是 guest
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management





    package main
    import (
    const (
        //AMQP URI
        uri = "amqp://guest:guest@" //为主机ip
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues"
        //Body of message
        bodyMsg string = "hello angel"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main() {
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false, // durable
            false, // delete when unused
            false, // exclusive
            false, // no-wait
            nil, // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        // Producer只能发送到exchange,它是不能直接发送到queue的
        // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue
        // routing_key就是指定的queue名字
        err = channel.Publish(
            exchange, // exchange
            q.Name, // routing key
            false, // mandatory
            false, // immediate
                Headers: amqp.Table{},
                ContentType: "text/plain",
                ContentEncoding: "",
                Body: []byte(body),
        failOnError(err, "Failed to publish a message")


    package main
    import (
    const (
        //AMQP URI
        uri = "amqp://guest:guest@"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main() {
        bodyMsg := bodyFrom(os.Args)
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello angel"
        } else {
            s = strings.Join(args[1:], " ")
        return s
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange, // exchange
            q.Name,   // routing key
            false,    // mandatory
            false,    // immediate
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
        failOnError(err, "Failed to publish a message")


    package main
    import (
    const (
        //AMQP URI
        uri = "amqp://guest:guest@"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
    func main() {
        consumer(uri, exchangeName, queueName)
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string) {
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
        log.Printf("got queue, declaring %q", queue)
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        failOnError(err, "Failed to declare a queue")
        log.Printf("Queue bound to Exchange, starting Consume")
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")



