分别用了buffer和string 测试,结果几乎一致。
测试结果:
用send 比do快5倍左右,实测到10秒发送92.5w,平均每秒9.2万,do方法推送10秒16w条左右。
建议 发布方也加个订阅消息,接收方拿到消息回复一个状态,发送方定时轮训已发消息的状态,一定时间内没接收到,重新发一次,保障数据不丢失。
json.Marsha 序列化byte数组 ,使用分割符时候总是偶尔出现解包json的多出前缀空格或者分割符来,出现比例万分之几的样子,后来换成proto进行序列化解决了这个问题
我的电脑配置是i5-9400F 主频2.9 6核6线
我是单进程测试的。
id是发送方的记录条数,index是接收方记录的条数
使用Do方法的测试结果图
//_, err := c.Do("Publish", "chat", data)
if err != nil {
idx--
panic(err)
//return "", err
}
![](https://img.haomeiwen.com/i14653704/fe6050e761b0d846.png)
下面是使用Send方法的测试结果
err := c.Send("Publish", "chat", data)
if err != nil {
idx--
panic(err)
//return "", err
} else {
//send 方法 需要使用 c.Flush() 推送
err2 := c.Flush()
if err2 != nil {
idx--
panic(err2)
}
}
![](https://img.haomeiwen.com/i14653704/60b541f2da3b9670.png)
测试代码如下:
func newPool() *redis.Pool {
return &redis.Pool{
MaxIdle: 50,
MaxActive: 50, // max number of connections
Wait: true,
Dial: func() (redis.Conn, error) {
var c redis.Conn
var err error
//for err != nil {
c, err = redis.Dial("tcp", ":6379", redis.DialDatabase(1), redis.DialPassword("123456"))
if err != nil {
fmt.Println("-----------------------链接失败")
panic(err.Error())
}
//}
return c, err
},
}
}
var cbk int
var one int64
func TestCallback2(chann string, msg string) {
//log.Println("testcallback ----", string(msg))
cbk++
buf := bytes.NewBufferString(msg)
//bf.ReadBytes('\n')
btime, err := buf.ReadBytes('\n')
if err != nil {
fmt.Println(err)
return
}
stime := int64(binary.BigEndian.Uint64(btime))
//id, err := buf.ReadBytes('\n')
sid := int64(binary.BigEndian.Uint64(buf.Bytes()))
//stime := string(btime)
//sid := string(id)
//stime := utils.BytesToInt64(btime)
//sid := utils.BytesToInt64(id)
if sid%1000 == 0 || cbk%1000 == 0 {
if one == 0 {
one = stime
}
x := time.Now().Unix() - one
utils.Log.Info("channel : ", chann, " start time : ", stime, ";时长:", x, ";id:", sid, ";index", cbk, ";one:", one)
}
}
func run1() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err) // 这里的err其实就是panic传入的内容,挂掉后1秒后 重新执行自己
time.Sleep(1 * time.Second)
run1()
}
}()
var i int
var pool = newPool()
con := pool.Get()
fmt.Println(con)
defer con.Close()
c := redis.PubSubConn{con}
c.Subscribe("chat")
for {
i++
receive := c.Receive()
if receive == nil {
panic("receive err.Error()")
//continue fe
}
switch res := receive.(type) {
case redis.Message:
channel := (*string)(unsafe.Pointer(&res.Channel))
message := (*string)(unsafe.Pointer(&res.Data))
//fmt.Println("--------------------")
go TestCallback2(*channel, *message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
// 必须要先声明defer,否则不能捕获到panic异常
fmt.Println("断网 服务器挂掉")
panic(res.Error())
}
}
}
func main() {
utils.Log.Info("===========main start============")
//接收订阅消息
go run1()
//发布订阅消息
time.Sleep(1 * time.Second)
pus()
}
// 生成唯一id 方便记录跟踪
var idx int64
func pus() {
defer func() {
fmt.Println("pus defer")
time.Sleep(1 * time.Second)
if err := recover(); err != nil {
fmt.Println(err) // 这里的err其实就是panic传入的内容,55
}
pus()
}()
//var i int64
var pool = newPool()
c := pool.Get()
defer c.Close()
start := time.Now().Unix()
fmt.Println(start)
for {
idx++
msg := buffer.Buffer{}
//buf := make([]byte, 8)
//binary.BigEndian.PutUint64(buf, uint64(start))
////写成函数调用总报错 不知道为啥
buf := utils.Int64ToBytes(start)
msg.Write(buf)
//msg.WriteString("aaaaaaaaaaaaaaa")
msg.WriteByte('\n')
//buf2 := make([]byte, 8)
//binary.BigEndian.PutUint64(buf2, uint64(idx))
buf2 := utils.Int64ToBytes(idx)
msg.Write(buf2)
//推送string 和bytes 性能几乎是一样的
data := string(msg.Bytes())
//用send 比do快5倍左右,实测到10秒发送92.5w,平均每秒9.2万,丢失2418条,do方法推送10秒16w条左右,丢失2条。
// 建议 发布方也加个订阅消息,接收方拿到消息回复一个状态,发送方定时轮训已发消息的状态,一定时间内没接收到,重新发一次。
//err := c.Send("Publish", "chat", data)
_, err := c.Do("Publish", "chat", data)
if err != nil {
idx--
panic(err)
//return "", err
} else {
//send 方法 需要使用 c.Flush() 推送
//err2 := c.Flush()
//if err2 != nil {
// idx--
// panic(err2)
//}
}
if idx%10000 == 0 {
//time.Sleep(1 * time.Second)
}
//fmt.Println(idx)
//time.Sleep(1 * time.Millisecond)
//return result, nil
}
}
网友评论