美文网首页golang学习篇章
go客户端并发http请求

go客户端并发http请求

作者: Best博客 | 来源:发表于2021-01-25 20:23 被阅读0次

    http客户端并发访问一个服务端

    当你要通过http请求群发100万邮件(发送邮件的服务器不需要你考虑资源消耗),越快发送完越好,很自然你会通过调节http.client的pool资源池的大小,从而单位时间内发送更多的请求出去,这里便是值得注意的地方,你要向同一个目标服务器发送100万请求,哪怕你把pool调大到500,想的是顺时hi并发100个请求发送出去,但是其实只有2个,因为http.client中MaxIdleConnsPerHost默认值为2,它的作用是像同一个目标机器发送请求的最大并发量。

    image.png

    以下实战demo

    package main
    
    import (
        "bytes"
        "encoding/json"
        "fmt"
        "github.com/360EntSecGroup-Skylar/excelize/v2"
        "github.com/go-resty/resty/v2"
        "net/http"
        "os"
        "strings"
        "sync"
        "time"
    )
    
    var client = resty.New().SetDebug(false).SetHeader("Content-Type", "application/json").SetTimeout(10 * time.Second)
    
    func init() {
        t2 := http.DefaultTransport.(*http.Transport).Clone()
        t2.MaxIdleConns = 100
        t2.MaxConnsPerHost = 100
        t2.MaxIdleConnsPerHost = 100  //无论你连接池多大,但是如果你是并发一个服务器,请调大次参数,它是允许你客户端向同一个服务端发送并发请求个数的控制参数,连接池100,是针对100个不同服务器的并发个数控制
        client.SetTransport(t2)
    }
    
    type Task struct {
        textgoApi   string
        filePath    string
        sheets      map[string][]int
        result      chan string
        result2     chan string
        resultPath  string
        resultPath2 string
    }
    
    func main() {
        task := Task{
            filePath:    "",
            sheets:      map[string][]int{},
            result:      make(chan string, 100),
            result2:     make(chan string, 100),
            textgoApi:   "",
            resultPath:  "",
            resultPath2: "",
        }
        fmt.Println("start run***")
        task.run()
        fmt.Println("start end***")
    }
    
    func (t *Task) run() {
    
        f, err := excelize.OpenFile(t.filePath)
        if err != nil {
            fmt.Println(err, "run openfile发生了错误")
            return
        }
        go func() {
            defer func() {
                close(t.result)
                close(t.result2)
            }()
    
            for key, val := range t.sheets {
                fmt.Println(key, val)
                t.handle(f, key, val)
            }
        }()
    
        wg := &sync.WaitGroup{}
    
        wg.Add(2)
        go func() {
            defer wg.Done()
    
            n := 0
            for res := range t.result2 {
                n++
                t.appendWrite(t.resultPath2, res)
            }
            t.appendWrite(t.resultPath2, fmt.Sprintf("总共没违规合计:%d", n))
        }()
    
        go func() {
            defer wg.Done()
    
            num := 0
            for res := range t.result {
                num++
                t.appendWrite(t.resultPath, res)
            }
            t.appendWrite(t.resultPath, fmt.Sprintf("总共违规合计:%d", num))
        }()
    
        wg.Wait()
    
    }
    
    func (t *Task) handle(file *excelize.File, sheet string, columns []int) {
        rows, err := file.Rows(sheet)
        if err != nil {
            fmt.Println(err, "file.Rows(sheet)")
            return
        }
        for rows.Next() {
            data, err := rows.Columns()
            if err != nil {
                fmt.Println(err, "rows.Columns()")
                continue
            }
            t.call(data, columns)
        }
    }
    
    func (t *Task) call(data []string, columns []int) {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println(err, "recover---")
            }
        }()
        for k, v := range data {
            if t.exitsElem(columns, k+1) {
    
                strs := strings.Split(v, ";")
    
                isVio, s := t.isViolation(strs)
    
                if isVio {
                    //违规
                    t.result <- fmt.Sprintf("违规类容:%s           |--|--|          原始类容:  %s", s, strings.Join(data, ""))
                    return
                }
    
            }
        }
        t.result2 <- fmt.Sprintf("没有违规类容:%s           |--|--|  ", strings.Join(data, ""))
    
    }
    
    func (t *Task) isViolation(text []string) (b bool, s string) {
        for _, v := range text {
            if v == "" {
                continue
            }
            resp := t.isVoi(v)
            if resp {
                s = v
                b = true
                return
            }
        }
        return
    }
    
    type RequestParam struct {
        Text string `json:"text"`
    }
    
    type Resp struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
        Data struct {
            OutputInfo []RespData `json:"output_info"`
        } `json:"data"`
    }
    
    type RespData struct {
        IsIllegal   int     `json:"is_illegal"`
        IllegalType string  `json:"illegal_type"`
        Label       int     `json:"label"`
        Score       float64 `json:"score"`
    }
    
    //{
    //    "code": 0,
    //    "msg": "success",
    //    "data": {
    //        "output_info": [
    //            {
    //                "is_illegal": 1,
    //                "illegal_type": "涉政",
    //                "label": 3,
    //                "score": 0.99994314
    //            }
    //        ]
    //    }
    //}
    func (t *Task) isVoi(text string) (b bool) {
        texts := []string{}
        byteText := []rune(text)
        by := bytes.Buffer{}
        strText := ""
        for _, v := range byteText {
            by.WriteRune(v)
            strText = by.String()
            if len(strText) > 90 {
                texts = append(texts, strText)
                by = bytes.Buffer{}
            }
        }
        texts = append(texts, strText)
    
        for _, ttt := range texts {
            b = t.post(ttt)
            if b {
                return
            }
        }
        return
    }
    
    func (t *Task) post(ttt string) (b bool) {
        data := Resp{}
        for i := 0; i < 3; i++ {
            reqParam := RequestParam{Text: ttt}
            req, _ := json.Marshal(reqParam)
    
            resp, err := client.R().SetBody(string(req)).Post(t.textgoApi)
            if err != nil {
                fmt.Println(err, "= client.R().SetBody(string(req)).Post(t.textgoApi)")
                return
            }
            err = json.Unmarshal(resp.Body(), &data)
            if err != nil {
                return
            }
            if data.Code != 0 {
                time.Sleep(time.Millisecond * 100)
                fmt.Println("我是3次都失败了的数据", data)
                continue
            }
    
            if len(data.Data.OutputInfo) <= 0 {
                return
            }
    
            if data.Data.OutputInfo[0].IsIllegal == 1 {
                b = true
                return
            }
            return
        }
        fmt.Println("我是3次都失败了的数据", data, ttt)
        return
    }
    
    func (t *Task) exitsElem(data []int, i int) (b bool) {
        for _, v := range data {
            if v == i {
                return true
            }
        }
        return
    }
    
    //追加写
    func (t *Task) appendWrite(path string, content string) (err error) {
        f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0644)
        if err != nil {
            return
        }
        defer f.Close()
    
        _, err = fmt.Fprintln(f, content)
        return
    }
    
    // Pool Goroutine Pool
    type Pool struct {
        queue chan int
        wg    *sync.WaitGroup
    }
    
    // newPoll 新建一个协程池
    func newPoll(size int) *Pool {
        if size <= 0 {
            size = 1
        }
        return &Pool{
            queue: make(chan int, size),
            wg:    &sync.WaitGroup{},
        }
    }
    
    // Add 新增一个执行
    func (p *Pool) Add(delta int) {
        // delta为正数就添加
        for i := 0; i < delta; i++ {
            p.queue <- 1
        }
        // delta为负数就减少
        for i := 0; i > delta; i-- {
            <-p.queue
        }
        p.wg.Add(delta)
    }
    
    // Done 执行完成减一
    func (p *Pool) Done() {
        <-p.queue
        p.wg.Done()
    }
    
    // Wait 等待Goroutine执行完毕
    func (p *Pool) Wait() {
        p.wg.Wait()
    }
    
    
    

    参考文献
    原来这样使用 Go HTTP 客户端才能获取更高性能

    相关文章

      网友评论

        本文标题:go客户端并发http请求

        本文链接:https://www.haomeiwen.com/subject/ssmbzktx.html