美文网首页
golang redigo 发布订阅实测每秒完成9.2万,记录一

golang redigo 发布订阅实测每秒完成9.2万,记录一

作者: yichen_china | 来源:发表于2023-09-15 17:52 被阅读0次

分别用了buffer和string 测试,结果几乎一致。
测试结果:

用send 比do快5倍左右,实测到10秒发送92.5w,平均每秒9.2万,do方法推送10秒16w条左右。

建议 发布方也加个订阅消息,接收方拿到消息回复一个状态,发送方定时轮训已发消息的状态,一定时间内没接收到,重新发一次,保障数据不丢失。
json.Marsha 序列化byte数组 ,使用分割符时候总是偶尔出现解包json的多出前缀空格或者分割符来,出现比例万分之几的样子,后来换成proto进行序列化解决了这个问题

我的电脑配置是i5-9400F 主频2.9 6核6线
我是单进程测试的。
id是发送方的记录条数,index是接收方记录的条数

使用Do方法的测试结果图

    //_, err := c.Do("Publish", "chat", data)
        if err != nil {
            idx--
            panic(err)
            //return "", err
        }
G6JC80Z}J5`GC{ZJ@%I_C75.png

下面是使用Send方法的测试结果

    err := c.Send("Publish", "chat", data)
        if err != nil {
            idx--
            panic(err)
            //return "", err
        } else {
            //send 方法 需要使用  c.Flush() 推送
            err2 := c.Flush()
            if err2 != nil {
                idx--
                panic(err2)
            }
        }
W{1F}(~4P`EOOOHAIOR(ME8.png

测试代码如下:

func newPool() *redis.Pool {
    return &redis.Pool{
        MaxIdle:   50,
        MaxActive: 50, // max number of connections
        Wait:      true,
        Dial: func() (redis.Conn, error) {
            var c redis.Conn
            var err error
            //for err != nil {
            c, err = redis.Dial("tcp", ":6379", redis.DialDatabase(1), redis.DialPassword("123456"))
            if err != nil {
                fmt.Println("-----------------------链接失败")
                panic(err.Error())
            }
            //}
            return c, err
        },
    }

}

var cbk int
var one int64

func TestCallback2(chann string, msg string) {
    //log.Println("testcallback ----", string(msg))
    cbk++
    buf := bytes.NewBufferString(msg)
    //bf.ReadBytes('\n')
    btime, err := buf.ReadBytes('\n')
    if err != nil {
        fmt.Println(err)
        return
    }
    stime := int64(binary.BigEndian.Uint64(btime))
    //id, err := buf.ReadBytes('\n')
    sid := int64(binary.BigEndian.Uint64(buf.Bytes()))
    //stime := string(btime)
    //sid := string(id)
    //stime := utils.BytesToInt64(btime)
    //sid := utils.BytesToInt64(id)
    if sid%1000 == 0 || cbk%1000 == 0 {
        if one == 0 {
            one = stime
        }
        x := time.Now().Unix() - one
        utils.Log.Info("channel : ", chann, " start time : ", stime, ";时长:", x, ";id:", sid, ";index", cbk, ";one:", one)
    }
}

func run1() {
    defer func() {
        if err := recover(); err != nil {
            fmt.Println(err) // 这里的err其实就是panic传入的内容,挂掉后1秒后 重新执行自己
            time.Sleep(1 * time.Second)
            run1()
        }
    }()
    var i int
    var pool = newPool()

    con := pool.Get()

    fmt.Println(con)
    defer con.Close()
    c := redis.PubSubConn{con}
    c.Subscribe("chat")
    for {
        i++
        receive := c.Receive()
        if receive == nil {
            panic("receive err.Error()")
            //continue fe
        }
        switch res := receive.(type) {
        case redis.Message:
            channel := (*string)(unsafe.Pointer(&res.Channel))
            message := (*string)(unsafe.Pointer(&res.Data))
            //fmt.Println("--------------------")
            go TestCallback2(*channel, *message)
        case redis.Subscription:
            fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
        case error:
            // 必须要先声明defer,否则不能捕获到panic异常
            fmt.Println("断网 服务器挂掉")
            panic(res.Error())
        }
    }

}
func main() {
    utils.Log.Info("===========main start============")
    //接收订阅消息
    go run1()
    //发布订阅消息
    time.Sleep(1 * time.Second)

    pus()

}

// 生成唯一id 方便记录跟踪
var idx int64

func pus() {
    defer func() {
        fmt.Println("pus defer")
        time.Sleep(1 * time.Second)
        if err := recover(); err != nil {
            fmt.Println(err) // 这里的err其实就是panic传入的内容,55
        }
        pus()
    }()

    //var i int64
    var pool = newPool()
    c := pool.Get()
    defer c.Close()
    start := time.Now().Unix()
    fmt.Println(start)

    for {
        idx++
        msg := buffer.Buffer{}
        //buf := make([]byte, 8)
        //binary.BigEndian.PutUint64(buf, uint64(start))
        ////写成函数调用总报错 不知道为啥
        buf := utils.Int64ToBytes(start)
        msg.Write(buf)
        //msg.WriteString("aaaaaaaaaaaaaaa")
        msg.WriteByte('\n')
        //buf2 := make([]byte, 8)
        //binary.BigEndian.PutUint64(buf2, uint64(idx))
        buf2 := utils.Int64ToBytes(idx)
        msg.Write(buf2)
        //推送string 和bytes 性能几乎是一样的
        data := string(msg.Bytes())
        //用send 比do快5倍左右,实测到10秒发送92.5w,平均每秒9.2万,丢失2418条,do方法推送10秒16w条左右,丢失2条。
        // 建议 发布方也加个订阅消息,接收方拿到消息回复一个状态,发送方定时轮训已发消息的状态,一定时间内没接收到,重新发一次。
        //err := c.Send("Publish", "chat", data)
        _, err := c.Do("Publish", "chat", data)
        if err != nil {
            idx--
            panic(err)
            //return "", err
        } else {
            //send 方法 需要使用  c.Flush() 推送
            //err2 := c.Flush()
            //if err2 != nil {
            //  idx--
            //  panic(err2)
            //}
        }
        if idx%10000 == 0 {
            //time.Sleep(1 * time.Second)
        }
        //fmt.Println(idx)

        //time.Sleep(1 * time.Millisecond)
        //return result, nil
    }
}

相关文章

  • golang 订阅发布机制实现

    大家使用较多的生产消费模式中间件,大都是订阅发布机制,反过来大家用这些中间件肯定都会用,一方subscribe(订...

  • golang nats[2] 发布订阅模式

    发布订阅-模式 要求:发布消息时,订阅者必须已经完成订阅且处于激活状态。注意:nc.Subscribe这个方法不是...

  • 2016巨献 一大波未实测平台整理发布 未实测

    国庆巨献 一大波未实测平台整理发布 未实测 国庆巨献一大波未实测平台整理发布未实测 http://royalstr...

  • 发布订阅模式和观察者模式

    发布/订阅模式 订阅者 发布者 事件中心 我们假定,存在一个“事件中心”,某个任务执行完成,就向事件中心“发布”(...

  • Kafka简介

    Kafka的主要特点:1. 为发布和订阅提供高吞吐量,每秒可产生25万消息(50MB),每秒可处理55万消息(11...

  • go-redis 发布/订阅

    go-redis 发布/订阅 最近golang 的项目开始使用redis 对于redis 真的是接触少,而且是go...

  • Golang RabbitMQ发布订阅模式(广播模式、fanou

    golang RabbitMQ发布订阅模式(广播模式、fanout模式),就是一个生产者发送的消息会被多个消费者处...

  • Kafka-核心API

    ** Kafka是一种分布式的,基于发布/订阅的消息系统,它可以让你发布和订阅记录流。在这方面,它类似...

  • golang 操作redis

    golang操作redis一般有两种库,一个是go-redis,一个是redigo,在这里我们主要介绍go-red...

  • [学习笔记]Rosserial实现Windows-ROS交互操作

    节点:代码中在Window下完成节点发布功能,在Linux下通过rostopic完成订阅操作节点发布cmd_vel...

网友评论

      本文标题:golang redigo 发布订阅实测每秒完成9.2万,记录一

      本文链接:https://www.haomeiwen.com/subject/pixbvdtx.html