美文网首页
Boomer 实战,2w 并发轻松实现

Boomer 实战,2w 并发轻松实现

作者: 梨花菜 | 来源:发表于2020-08-05 01:55 被阅读0次

    压测背景

    接入第三方mqtt服务,目前公司设备超过10w台,并发预计4000rps

    工具选择

    Jmeter

    • 优点: 有现成Mqtt插件,开箱即用,支持分布式
    • 缺点: 施压需要消耗很大性能,插件不够灵活(可能是我不熟悉)

    Locust + Python

    • 优点: 很灵活,有现成Web界面
    • 缺点: 原生Locust不支持mqtt协议,需要重写HTTPLocust这个类.Python受限于GLI,并发不给力.需要起多个slave

    Locust + Boomer

    • 优点: boomergolang编写的,性能强劲,可搭配locust实现Web界面
    • 缺点: 缺少mqtt现成案例参考(我本身对于go也不算熟悉)

    一开始测试选了Jmeter,因为简单方便.但发现调试不是很方便,还是上面的,可能不熟悉.另外,50个并发左右,我的MBP(19款16寸,6核),就开始咆哮了!时间关系,我没深究原因.
    后来选择了Locust + Boomer.踩了不少坑,但最后总算完成了任务.

    压测分析

    • mqtt账号和主题是一一绑定,因此需要批量生成大概20w个账号

    压测场景

    账号建立连接

    • 300rps,5分钟
    • 500rps,5分钟
    • 1000rps,5分钟

    发送消息

    • 1000rps,5分钟
    • 2000rps,5分钟
    • 4000rps,5分钟

    脚本设计

    流程图

    image.png

    实现代码

    //  main.go
    // 代码仅供参考,无法直接运行.
    package main
    
    import (
        "bytes"
        "encoding/csv"
        "fmt"
        MQTT "github.com/eclipse/paho.mqtt.golang"
        "github.com/myzhan/boomer"
        "io"
        "io/ioutil"
        "log"
        "os"
        "strconv"
        "strings"
        "sync"
        "time"
    )
    
    var rows [][]string // 读取csv文件保存到这里
    var clientTopic []map[string]MQTT.Client
    var conn = 0 // 调试用
    var failCount = 0 // 初始化失败数量
    var i = 0 // 控制并发
    var j = 1 // 记录消息发送成功
    var f = 1 // 记录消息发送失败
    var nowStr = strconv.Itoa(int(time.Now().Unix())) // 当前时间戳,用来做后续查询的消息的标识符
    
    func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {
        defer func() {
            group.Add(-1)
            err := recover()
            if err != nil {
                failCount++
                fmt.Println("login fail clientId:  ", clientId)
            }
        }()
        token := c.Connect()
        if token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
        // 组装topic
        topic := fmt.Sprintf("msg/%s/supply", clientId)
        temp := make(map[string]MQTT.Client)
        temp[topic] = c
        clientTopic = append(clientTopic, temp)
        conn++ // 调试用
    }
    
    func initClients() {
        var wg sync.WaitGroup 
        server := "server_ip:1883"
        for i := 0; i < len(rows); i++ {
            wg.Add(1)
            clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
            opts := MQTT.NewClientOptions().AddBroker(server)
            opts.SetUsername(userName)
            opts.SetPassword(passWord)
            opts.SetClientID(clientId)
            opts.SetKeepAlive(300 * time.Second)
            c := MQTT.NewClient(opts)
            go newConn(c, clientId, &wg)
    
        }
        wg.Wait() // 等到所有协程执行完成 
        fmt.Printf("init finish, clients len is %d \n", len(clientTopic))
        fmt.Printf("conn: %d \n", conn)
        fmt.Printf("failCount: %d \n", failCount)
    }
    
    func initCsvData() {
        pwd, _ := os.Getwd()
        b, err := ioutil.ReadFile(pwd + "/clients.csv")
        fs := bytes.NewBuffer(b)
        if err != nil {
            log.Fatalf("can not open the file, err is %+v", err)
        }
    
        r := csv.NewReader(fs)
        //针对大文件,一行一行的读取文件
        for {
            row, err := r.Read()
            if err != nil && err != io.EOF {
                log.Fatalf("can not read, err is %+v", err)
            }
            if err == io.EOF {
                break
            }
            rows = append(rows, row)
        }
    }
    
    
    
    func login() {
        server := "server_ip:port"
        clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]
        start := time.Now()
        opts := MQTT.NewClientOptions().AddBroker(server)
        opts.SetUsername(userName)
        opts.SetPassword(passWord)
        opts.SetClientID(clientId)
        c := MQTT.NewClient(opts)
        token := c.Connect()
        elapsed := time.Since(start)
        if token.Error() == nil {
            log.Println("success" + strconv.Itoa(j))
            boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))
        } else {
            log.Println(token.Error())
            boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)
        }
        c.Disconnect(5)
        // avoid out of array
        if i < len(clientTopic)-1 {
            i++
        } else {
            i = 0
        }
        j++
    }
    
    func sendMsg() {
        start := time.Now()
        msgId := "msg" + strconv.Itoa(i)
        var clientId string
        var topic string
        var c MQTT.Client
        for k, v := range clientTopic[i] {
            clientId = k[6:19]
            topic = k
            c = v // v就是一个connected的client
        }
        deviceTime := nowStr
        str := []string{msgId, clientId, deviceTime}
        msgPayload := strings.Join(str, "|")
    
        if c.IsConnected() == true {
            token := c.Publish(topic, 1, false, msgPayload)
            //token.Wait() 等待消息发送完成,这样会极大拉低并发
            elapsed := time.Since(start)
            if token.Error() == nil {
                fmt.Printf("this topic name is: %s \n", topic)
                fmt.Printf("this topic payload is: %s \n", msgPayload)
                fmt.Printf("success msg index: %v elapsed: %v  \n", j, elapsed)
                j++ // 消息发送成功, 记录一条,并且也给locust记录一条,方便后续校对数据量
                boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))
                // 避免数组越界
                if i < len(clientTopic)-1 {
                    i++
                } else {
                    i = 0
                }
            } else {
                boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
                fmt.Printf("发送失败, fail msg index: %v \n", f)
            }
    
        } else {
            if token := c.Connect(); token.Wait() && token.Error() != nil {
                elapsed := time.Since(start)
                fmt.Printf("fail msg index: %v \n", f)
                f++
                boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)
            }
        }
    }
    
    func main() {
        initCsvData()
        initClients()
    
        task1 := &boomer.Task{
            Name:   "myTask",
            Weight: 1,
            Fn:     sendMsg,
        }
        
        //task2 := &boomer.Task{
        //  Name:   "login",
        //  Weight: 1,
        //  Fn:     login,
        //}
        boomer.Run(task1)
    }
    
    

    压测结果

    施压两台机:8核CPU,32G内存.
    就放一组最大值的,2w并发,10分钟.
    实际上内存消耗很小,反而是CPU拉满


    locust 施压机1 施压机2

    相关文章

      网友评论

          本文标题:Boomer 实战,2w 并发轻松实现

          本文链接:https://www.haomeiwen.com/subject/klagrktx.html