Golang RabbitMQ 最简单的队列模式,只有一个消息生产者,一个消息消费者。
image.png说明:
P 代表生产者 , C 代表消费者,红色代表队列。
提示:如果不了解RabbitMQ,请先阅读rabbitmq基础概念章节。
1.安装依赖包
go get github.com/streadway/amqp
导入依赖包
import (
"github.com/streadway/amqp"
)
2.发送消息
下面分步骤演示消息生产者如何完成消息推送
2.1. 连接RabbitMQ Server
// 连接RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
连接地址说明:
amqp://账号:密码@RabbitMQ地址:端口/
2.2. 创建Channel
大部分操作都是在Channel(信道 )完成的。
ch, err := conn.Channel()
defer ch.Close()
2.3. 声明队列
代表我们需要读写哪个队列
q, err := ch.QueueDeclare(
"hello", // 队列名字
false, // 消息是否持久化
false, // 不使用的时候删除队列
false, // exclusive
false, // no-wait
nil, // arguments
)
2.4. 推送消息
// 消息内容
body := "Hello World!"
// 推送消息
err = ch.Publish(
"", // exchange(交换机名字),这里忽略
q.Name, // 路由参数,这里使用队列名字作为路由参数
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // 消息内容
})
2.5.发送消息完整代码
package main
// 导入包
import (
"log"
"github.com/streadway/amqp"
)
// 错误处理
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建信道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明要操作的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 要发送的消息内容
body := "Hello World!"
// 发送消息
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
3.接收消息
接收消息的前面三个步骤跟发送消息一样,分别对应2.1、2.2、2.3章节。
完整的接收消息代码如下:
package main
// 导入包
import (
"log"
"github.com/streadway/amqp"
)
// 错误处理
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建信道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明要操作的队列
q, err := ch.QueueDeclare(
"hello", // 队列名需要跟发送消息的队列名保持一致
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 创建消息消费者
msgs, err := ch.Consume(
q.Name, // 队列名
"", // 消费者名字,不填,则自动生成一个唯一ID
true, // 是否自动提交消息,即自动告诉rabbitmq消息已经处理成功。
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 循环拉取队列中的消息
for d := range msgs {
// 打印消息内容
log.Printf("Received a message: %s", d.Body)
}
}
网友评论