美文网首页
go influxdb

go influxdb

作者: 五岁小孩 | 来源:发表于2021-04-05 10:58 被阅读0次

go influxdb

package dao

import (
    "context"
    "eastwan.com/ew-tool/common"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/astaxie/beego/logs"
    "github.com/influxdata/influxdb-client-go"
    "net/url"
    "time"
)

var myInfluxClient *influxdb2.Client

func InitInfluxClient() *influxdb2.Client {

    myInfluxOption := &influxdb2.Options{}

    //  0 error, 1 - warning, 2 - info, 3 - debug
    myInfluxOption.SetLogLevel(0)
    myInfluxOption.SetHttpRequestTimeout(10)
    myInfluxOption.SetBatchSize(100)
    myInfluxOption.SetFlushInterval(3000)
    myInfluxOption.SetMaxRetries(3)
    myInfluxOption.SetRetryInterval(1000)

    newInfluxClient := influxdb2.NewClientWithOptions(
        DBConfig.InfluxUrl,
        fmt.Sprintf("%s:%s", DBConfig.InfluxUser, DBConfig.InfluxPass),
        myInfluxOption,
    )

    myInfluxClient = &newInfluxClient

    return myInfluxClient
}

func GetInfluxClient() *influxdb2.Client {

    if myInfluxClient != nil {
        return myInfluxClient
    } else {
        return InitInfluxClient()
    }

}

//// 非阻塞,异步写(******内存问题未解决,暂时不启用******)
//func NonBlockingInfluxWrite(databaseName string, retentionPolicy string, measurement string, tags *map[string]string, fields *map[string]interface{}, ts time.Time) {
//
//  client := *GetInfluxClient()
//
//  // influx 2.x 才有ibucket的概念。nflux 1.x 没有bucket的概念,此处 使用 database name/retention policy
//  writeApi := client.WriteApi("", fmt.Sprintf("%s/%s", databaseName, retentionPolicy))
//
//  // create point using full params constructor
//  p := influxdb2.NewPoint(measurement, *tags, *fields, ts)
//
//  // Write data
//  writeApi.WritePoint(p)
//
//}

// 阻塞写,同步写
func BlockingInfluxWrite(databaseName string, retentionPolicy string, measurement string, tags *map[string]string, fields *map[string]interface{}, ts time.Time) error {

    client := *GetInfluxClient()

    // influx 2.x 才有ibucket的概念。nflux 1.x 没有bucket的概念,此处 使用 database name/retention policy
    writeApi := client.WriteApiBlocking("", fmt.Sprintf("%s/%s", databaseName, retentionPolicy))

    // create point using full params constructor
    p := influxdb2.NewPoint(measurement, *tags, *fields, ts)

    // Write data
    err := writeApi.WritePoint(context.Background(), p)
    if err != nil {
        return err
    }
    return nil
}

type InfluxDbQueryResultSeries struct {
    Name    string          `json:"name"`
    Columns []string        `json:"columns"`
    Values  [][]interface{} `json:"values"`
}
type InfluxDbQueryResultItem struct {
    StatementId int                         `json:"statement_id"`
    Series      []InfluxDbQueryResultSeries `json:"series"`
}

type InfluxDbQueryResult struct {
    Results []InfluxDbQueryResultItem `json:"results"`
}

func QueryInfluxDataByUrl(db string, query string) (*InfluxDbQueryResult, error) {
    //curl -XPOST 'http://localhost:8086/query?pretty=true'
    //-H 'Authorization: Token admin:nfdas@576576'
    //-d "db=NFDAS_CACHE&q=select MEAN(Inbound) as InAvg,MEAN(Outbound) as OutAvg from snmp where "LinkId"='15' group by time(5m) ORDER BY time DESC limit 1"

    result := &InfluxDbQueryResult{}

    headers := map[string]string{
        "Authorization": "Token " + DBConfig.InfluxUser + ":" + DBConfig.InfluxPass,
    }
    //body := utils.UrlEncode()

    //body := url.QueryEscape(fmt.Sprintf("db=%s&q=%s", db, query))
    body := ""

    respStr, queryErr := common.HttpPost(DBConfig.InfluxUrl+"/query?db="+db+"&q="+url.QueryEscape(query), "", headers, body)
    //url string, contentType string, headers map[string]string, body string) (content string, err error) {
    if queryErr == nil && !common.IsEmpty(respStr) {
        parseErr := json.Unmarshal([]byte(respStr), result)
        if parseErr != nil {
            logs.Error(parseErr)
        }
    }

    return result, queryErr
}

// timeRangeStart 如:start: -1h
func QueryInfluxData(databaseName string, retentionPolicy string, timeRange string) {

    client := *GetInfluxClient()

    // Get query client. influx 1.x Org name is not used
    queryApi := client.QueryApi("")
    // Supply string in a form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    result, err := queryApi.Query(context.Background(), `from(bucket:"`+fmt.Sprintf("%s/%s", databaseName, retentionPolicy)+`")|> range(`+
        timeRange+`) |> filter(fn: (r) => r._measurement == "snmp")`)
    if err == nil {
        for result.Next() {
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
        fmt.Printf("Query error: %s\n", err.Error())
    }
}

func QueryInfluxArrays(db string, query string) ([]map[string]interface{}, error) {
    //curl -XPOST 'http://localhost:8086/query?pretty=true'
    //-H 'Authorization: Token admin:nfdas@576576'
    //-d "db=NFDAS_CACHE&q=select MEAN(Inbound) as InAvg,MEAN(Outbound) as OutAvg from snmp where "LinkId"='15' group by time(5m) ORDER BY time DESC limit 1"

    var respMaps []map[string]interface{}
    var columnsArr []string
    var valuesArr [][]interface{}
    result := &InfluxDbQueryResult{}

    headers := map[string]string{
        "Authorization": "Token " + DBConfig.InfluxUser + ":" + DBConfig.InfluxPass,
    }
    //body := utils.UrlEncode()

    //body := url.QueryEscape(fmt.Sprintf("db=%s&q=%s", db, query))
    body := ""

    respStr, err := common.HttpPost(DBConfig.InfluxUrl+"/query?db="+db+"&q="+url.QueryEscape(query), "", headers, body)
    if err != nil {
        return respMaps, err
    }
    if respStr == "" {
        err = errors.New("influx query data is null")
        return respMaps, err
    }
    err = json.Unmarshal([]byte(respStr), result)
    if err != nil {
        logs.Error(err)
        return respMaps, err
    }
    if len(result.Results) > 0 && len(result.Results[0].Series) > 0 {
        //获取列表字段数组
        columnsArr = result.Results[0].Series[0].Columns
        if len(columnsArr) <= 0 {
            err = errors.New("influx query data columns is null")
            return respMaps, err
        }
        //遍历字段数组,存储字段名称和索引为map

        //获取列表字段值二维数组
        valuesArr = result.Results[0].Series[0].Values
        if len(valuesArr) <= 0 {
            err = errors.New("influx query data is null")
            return respMaps, err
        }
        for _, valueArr := range valuesArr {
            var respMap = make(map[string]interface{})
            for index, col := range columnsArr {
                respMap[col] = valueArr[index]
            }
            respMaps = append(respMaps, respMap)
        }

    }
    return respMaps, err
}

相关文章

网友评论

      本文标题:go influxdb

      本文链接:https://www.haomeiwen.com/subject/yiugxltx.html