问题:
rabbitmq 的吞吐有多大?
目前开启了rabbitmq的持久化/Ack 机制/用的是direct路由器。
上次测试,4.5w/min 的时候,beam.smp cpu利用率过100%
下午做了个测试,不阻塞下游,只测试rabbitmq 的消费速度:
package main
import (
"fmt"
"github.com/streadway/amqp"
"sync"
)
const (
queueName = "queue.newpushservice.deal.push"
exchange = "exchange.newpushservice.deal.push"
mqurl = "amqp://admin:admin@172.18.19.168:5672/"
ReceiverNum int = 5
ChannelBufferLength = 1000
)
func main() {
fmt.Println("start")
forever := make(chan bool)
go WorkMessage(ReceiveMessage())
<-forever
}
func getChannel() (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(mqurl)
if err != nil {
return nil, nil, err
}
channel, err := conn.Channel()
if err != nil {
return nil, nil, err
}
err = channel.Qos(1, 0, false)
if err != nil {
return nil, nil, err
}
return conn, channel, nil
}
func ReceiveMessage() <-chan amqp.Delivery {
out := make(chan amqp.Delivery, ChannelBufferLength)
var wg sync.WaitGroup
receiver := func() {
defer wg.Done()
RECONNECT:
for {
_, channel, err := getChannel()
if err != nil {
panic(err)
}
msgs, err := channel.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
//检测是否改通道有问题
closeChan := make(chan bool, 1)
go func() {
cc := make(chan *amqp.Error)
e := <-channel.NotifyClose(cc)
fmt.Printf("[RABBITMQ_CLIENT], channel close error:%s", e.Error())
closeChan <- true
}()
for {
select {
case msg, ok := <-msgs:
if !ok {
fmt.Println("channel be closed,need reconnect")
continue RECONNECT
}
out <- msg
case <-closeChan:
continue RECONNECT
}
}
}
}
wg.Add(ReceiverNum)
for i := 0; i < ReceiverNum; i++ {
go receiver()
}
go func() {
wg.Wait()
fmt.Println("all receiver is done, closing channel")
close(out)
}()
return out
}
func WorkMessage(in <-chan amqp.Delivery) {
var wg sync.WaitGroup
worker := func(msg amqp.Delivery) {
defer wg.Done()
fmt.Printf("receive message : %s", string(msg.Body))
msg.Ack(false)
}
wg.Add(1)
go func() {
defer wg.Done()
for message := range in {
wg.Add(1)
go worker(message)
}
}()
go func() {
wg.Wait()
fmt.Println("all worker is done, closing channel")
}()
}
消费速度在 30-40w/min,cpu 占有率在 180-200% , 内存占用在8%-10%。
4核64位系统,内存16G,cpu 型号( 8 Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz)
这里有个测试,留着遇到问题可以看
https://segmentfault.com/a/1190000016351345
网友评论