美文网首页
go并发编程案例解析2[添加监控]

go并发编程案例解析2[添加监控]

作者: 百炼 | 来源:发表于2019-01-05 21:41 被阅读0次

    date[2019-01-5]

    package main
    
    import (
        "bufio"
        "encoding/json"
        "flag"
        "fmt"
        "github.com/influxdata/influxdb/client/v2"
        "io"
        "log"
        "math/rand"
        "net/http"
        "net/url"
        "os"
        "regexp"
        "strconv"
        "strings"
        "time"
    )
    
    type LogProcess struct {
        rc chan []byte
        wc chan *Message
    
        read  Reader
        write Writer
    }
    
    type Reader interface {
        Read(rc chan []byte)
    }
    
    type Writer interface {
        Write(wc chan *Message)
    }
    
    type ReadDataFromFile struct {
        path string
    }
    type WriteDateToInfluxDb struct {
        influxDBua string
    }
    
    type Message struct {
        TimeLocal                    time.Time
        BytesSent                    int
        Path, Method, Scheme, Status string
        UpstreamTime, RequestTime    float64
    }
    
    const (
        TYPE_HANDLE_LINE = 0
        TYPE_ERR_NUM     = 1
    )
    
    var TypeMonitorChan = make(chan int, 200)
    //系统状态监控
    type SystemInfo struct {
        HandleLine   int     `json:"handleLine"`   //总处理日志行数
        Tps          float64 `json:"tps"`          //系统吞吐量
        ReadChanLen  int     `json:"readChanLen"`  //read channel长度
        WriteChanLen int     `json:"writeChanLen"` //write channel长度
        RunTime      string  `json:"runTime"`      //运行总时间
        ErrNum       int     `json:"errNum"`       //错误数
    }
    
    type Monitor struct {
        startTime time.Time
        data      SystemInfo
        tpsSli    []int
    }
    
    func (m *Monitor) start(lp *LogProcess) {
        go func() {
            for n := range TypeMonitorChan {
                switch n {
                case TYPE_ERR_NUM:
                    m.data.ErrNum += 1
                case TYPE_HANDLE_LINE:
                    m.data.HandleLine += 1
                }
            }
        }()
    
        ticker := time.NewTicker(time.Second * 5)
        go func() {
            for {
                <-ticker.C
                m.tpsSli = append(m.tpsSli, m.data.HandleLine)
                if len(m.tpsSli) > 2 {
                    m.tpsSli = m.tpsSli[1:]
                }
            }
        }()
        http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
            m.data.RunTime = time.Now().Sub(m.startTime).String()
            m.data.ReadChanLen = len(lp.rc)
            m.data.WriteChanLen = len(lp.wc)
    
            if len(m.tpsSli) >= 2 {
                m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
            }
    
            ret, _ := json.MarshalIndent(m.data, "", "\t")
            io.WriteString(writer, string(ret))
        })
    
        http.ListenAndServe(":9193", nil)
    }
    
    func (w *WriteDateToInfluxDb) Write(wc chan *Message) {
        //初始化influxdb client
        //从Write Channel读取数
        //Tags:Path,Method,Scheme,Status
        //Fiedls:
        //Time:
        //写入模块
        infSlic := strings.Split(w.influxDBua, "@")
    
        c, err := client.NewHTTPClient(client.HTTPConfig{
            Addr:     infSlic[0],
            Username: infSlic[1],
            Password: infSlic[2],
        })
        if err != nil {
            log.Fatal(err)
        }
        defer c.Close()
    
        for v := range wc {
            // Create a new point batch
            bp, err := client.NewBatchPoints(client.BatchPointsConfig{
                Database:  infSlic[3],
                Precision: infSlic[4],
            })
            if err != nil {
                log.Fatal(err)
            }
    
            // Create a point and add to batch
            //Tags:Path Method Scheme Status
    
            tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status,}
            fields := map[string]interface{}{
                "BytesSent":    v.BytesSent,
                "UpstreamTime": v.UpstreamTime,
                "RequestTime":  v.RequestTime,
            }
    
            pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
            if err != nil {
                log.Fatal(err)
            }
            bp.AddPoint(pt)
    
            // Write the batch
            if err := c.Write(bp); err != nil {
                log.Fatal(err)
            }
    
            // Close client resources
            if err := c.Close(); err != nil {
                log.Fatal(err)
            }
            log.Println("Write Success!")
        }
    }
    
    func (r *ReadDataFromFile) Read(rc chan []byte) {
        //读取模块
        //打开文件
        f, err := os.Open(r.path)
        if err != nil {
            panic(fmt.Sprintf("open file eror :%s", err.Error()))
        }
    
        //从文件末尾读取
        f.Seek(0, 2)
        rd := bufio.NewReader(f)
        for {
            line, err := rd.ReadBytes('\n')
            if err == io.EOF {
                TypeMonitorChan <- TYPE_ERR_NUM
                time.Sleep(500 * time.Millisecond)
                continue
            } else if err != nil {
                TypeMonitorChan <- TYPE_ERR_NUM
                panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
            }
            TypeMonitorChan <- TYPE_HANDLE_LINE
            rc <- line[:len(line)-1]
        }
    }
    
    func (lp *LogProcess) ProcessData() {
        //处理模块
        r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"`)
        rd := rand.New(rand.NewSource(time.Now().UnixNano()))
        loc, _ := time.LoadLocation("Asia/Shanghai")
        for v := range lp.rc {
            fmt.Println(string(v))
            ret := r.FindStringSubmatch(string(v))
            if len(ret) != 10 {
                TypeMonitorChan <- TYPE_ERR_NUM
                log.Println("FindStringSubmatch fail:", string(v))
                continue
            }
            message := &Message{}
            location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
            if err != nil {
                TypeMonitorChan <- TYPE_ERR_NUM
                log.Println("ParseInLocation fail:", err.Error(), string(ret[4]))
    
            }
            message.TimeLocal = location
            byteSent, _ := strconv.Atoi(ret[8])
            message.BytesSent = byteSent
    
            //GET /foo?query=t HTTP/1.0
            reqSli := strings.Split(ret[5], " ")
            if len(reqSli) != 3 {
                TypeMonitorChan <- TYPE_ERR_NUM
                log.Println("strings.Split Fail", ret[5])
                continue
            }
    
            message.Method = reqSli[1]
            message.Scheme = reqSli[1]
            u, err := url.Parse(reqSli[2])
            if err != nil {
                TypeMonitorChan <- TYPE_ERR_NUM
                log.Println("url parse fail:", err)
            }
            message.Path = u.Path
            message.Status = ret[6]
    
            message.UpstreamTime = rd.Float64() * 4
            message.RequestTime = rd.Float64() * 4
            //message.UpstreamTime, _ = strconv.ParseFloat(ret[12], 64)
            //message.RequestTime, _ = strconv.ParseFloat(ret[13], 64)
            lp.wc <- message
        }
    }
    
    func main() {
        var path, influxDsn string
        flag.StringVar(&path, "path", "C:/soft/nginx-1.15.8/logs/access.log", "read file path")
        flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@imooc@imoocpass@imooc@s", "read influxdb datasource")
    
        r := &ReadDataFromFile{
            path: path,
        }
        w := &WriteDateToInfluxDb{
            influxDBua: influxDsn,
        }
        lp := &LogProcess{
            rc:    make(chan []byte, 200),
            wc:    make(chan *Message, 200),
            read:  r,
            write: w,
        }
    
        go lp.read.Read(lp.rc)
        for i := 0; i < 2; i++ {
            go lp.ProcessData()
        }
    
        for i := 0; i < 4; i++ {
            go lp.write.Write(lp.wc)
        }
    
        m := Monitor{
            startTime: time.Now(),
            data:      SystemInfo{},
        }
        m.start(lp)
        time.Sleep(time.Duration(30000000) * time.Second)
    }
    

    结果

    运行起来,在浏览器查看
    http://127.0.0.1:9193/monitor

    {
        "handleLine": 0,
        "tps": 0,
        "readChanLen": 0,
        "writeChanLen": 0,
        "runTime": "15.4403878s",
        "errNum": 31
    }
    

    相关文章

      网友评论

          本文标题:go并发编程案例解析2[添加监控]

          本文链接:https://www.haomeiwen.com/subject/upndrqtx.html