美文网首页
golang遇到docker很简单

golang遇到docker很简单

作者: mick_ | 来源:发表于2019-12-31 08:56 被阅读0次

    本文转自:https://www.cnblogs.com/angelyan/p/11218260.html

    一、获取镜像指定版本,该版本包含了web控制页面

    docker pull rabbitmq:management

    二、运行镜像

    方式一:默认guest 用户,密码也是 guest
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
    方式二:设置用户名和密码
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

    三、访问ui页面

    http://localhost:15672/

    image

    四、golang案例

    #producer生产者代码
    package main
    
    import (
        "fmt"
    
        "log"
    
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
    
        uri = "amqp://guest:guest@10.0.0.11:5672/" // 10.0.0.11为主机ip
    
        //Durable AMQP exchange name
    
        exchangeName = ""
    
        //Durable AMQP queue name
    
        queueName = "test-queues"
    
        //Body of message
    
        bodyMsg string = "hello angel"
    )
    
    //如果存在错误,则输出
    
    func failOnError(err error, msg string) {
    
        if err != nil {
    
            log.Fatalf("%s: %s", msg, err)
    
            panic(fmt.Sprintf("%s: %s", msg, err))
    
        }
    
    }
    
    func main() {
    
        //调用发布消息函数
    
        publish(uri, exchangeName, queueName, bodyMsg)
    
        log.Printf("published %dB OK", len(bodyMsg))
    
    }
    
    //发布者的方法
    
    //@amqpURI, amqp的地址
    
    //@exchange, exchange的名称
    
    //@queue, queue的名称
    
    //@body, 主体内容
    
    func publish(amqpURI string, exchange string, queue string, body string) {
    
        //建立连接
    
        log.Printf("dialing %q", amqpURI)
    
        connection, err := amqp.Dial(amqpURI)
    
        failOnError(err, "Failed to connect to RabbitMQ")
    
        defer connection.Close()
    
        //创建一个Channel
    
        log.Printf("got Connection, getting Channel")
    
        channel, err := connection.Channel()
    
        failOnError(err, "Failed to open a channel")
    
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
    
        q, err := channel.QueueDeclare(
    
            queueName, // name
    
            false, // durable
    
            false, // delete when unused
    
            false, // exclusive
    
            false, // no-wait
    
            nil, // arguments
    
        )
    
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的
    
        // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue
    
        // routing_key就是指定的queue名字
    
        err = channel.Publish(
    
            exchange, // exchange
    
            q.Name, // routing key
    
            false, // mandatory
    
            false, // immediate
    
            amqp.Publishing{
    
                Headers: amqp.Table{},
    
                ContentType: "text/plain",
    
                ContentEncoding: "",
    
                Body: []byte(body),
            })
    
        failOnError(err, "Failed to publish a message")
    
    }
    
    image.png

    生产者生产数据

    #producer
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "os"
        "strings"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        bodyMsg := bodyFrom(os.Args)
        //调用发布消息函数
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello angel"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //发布者的方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    //@body, 主体内容
    func publish(amqpURI string, exchange string, queue string, body string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能发送到exchange,它是不能直接发送到queue的。
        // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange, // exchange
            q.Name,   // routing key
            false,    // mandatory
            false,    // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }
    

    消费者消费数据

    #consumer
    package main
    
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在错误,则输出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        //调用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名称
    //@queue, queue的名称
    func consumer(amqpURI string, exchange string, queue string) {
        //建立连接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //创建一个Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //创建一个queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //订阅消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //创建一个channel
        forever := make(chan bool)
    
        //调用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
        <-forever
    }
    

    相关文章

      网友评论

          本文标题:golang遇到docker很简单

          本文链接:https://www.haomeiwen.com/subject/zctboctx.html