go influxdb
package dao
import (
"context"
"eastwan.com/ew-tool/common"
"encoding/json"
"errors"
"fmt"
"github.com/astaxie/beego/logs"
"github.com/influxdata/influxdb-client-go"
"net/url"
"time"
)
var myInfluxClient *influxdb2.Client
func InitInfluxClient() *influxdb2.Client {
myInfluxOption := &influxdb2.Options{}
// 0 error, 1 - warning, 2 - info, 3 - debug
myInfluxOption.SetLogLevel(0)
myInfluxOption.SetHttpRequestTimeout(10)
myInfluxOption.SetBatchSize(100)
myInfluxOption.SetFlushInterval(3000)
myInfluxOption.SetMaxRetries(3)
myInfluxOption.SetRetryInterval(1000)
newInfluxClient := influxdb2.NewClientWithOptions(
DBConfig.InfluxUrl,
fmt.Sprintf("%s:%s", DBConfig.InfluxUser, DBConfig.InfluxPass),
myInfluxOption,
)
myInfluxClient = &newInfluxClient
return myInfluxClient
}
func GetInfluxClient() *influxdb2.Client {
if myInfluxClient != nil {
return myInfluxClient
} else {
return InitInfluxClient()
}
}
//// 非阻塞,异步写(******内存问题未解决,暂时不启用******)
//func NonBlockingInfluxWrite(databaseName string, retentionPolicy string, measurement string, tags *map[string]string, fields *map[string]interface{}, ts time.Time) {
//
// client := *GetInfluxClient()
//
// // influx 2.x 才有ibucket的概念。nflux 1.x 没有bucket的概念,此处 使用 database name/retention policy
// writeApi := client.WriteApi("", fmt.Sprintf("%s/%s", databaseName, retentionPolicy))
//
// // create point using full params constructor
// p := influxdb2.NewPoint(measurement, *tags, *fields, ts)
//
// // Write data
// writeApi.WritePoint(p)
//
//}
// 阻塞写,同步写
func BlockingInfluxWrite(databaseName string, retentionPolicy string, measurement string, tags *map[string]string, fields *map[string]interface{}, ts time.Time) error {
client := *GetInfluxClient()
// influx 2.x 才有ibucket的概念。nflux 1.x 没有bucket的概念,此处 使用 database name/retention policy
writeApi := client.WriteApiBlocking("", fmt.Sprintf("%s/%s", databaseName, retentionPolicy))
// create point using full params constructor
p := influxdb2.NewPoint(measurement, *tags, *fields, ts)
// Write data
err := writeApi.WritePoint(context.Background(), p)
if err != nil {
return err
}
return nil
}
type InfluxDbQueryResultSeries struct {
Name string `json:"name"`
Columns []string `json:"columns"`
Values [][]interface{} `json:"values"`
}
type InfluxDbQueryResultItem struct {
StatementId int `json:"statement_id"`
Series []InfluxDbQueryResultSeries `json:"series"`
}
type InfluxDbQueryResult struct {
Results []InfluxDbQueryResultItem `json:"results"`
}
func QueryInfluxDataByUrl(db string, query string) (*InfluxDbQueryResult, error) {
//curl -XPOST 'http://localhost:8086/query?pretty=true'
//-H 'Authorization: Token admin:nfdas@576576'
//-d "db=NFDAS_CACHE&q=select MEAN(Inbound) as InAvg,MEAN(Outbound) as OutAvg from snmp where "LinkId"='15' group by time(5m) ORDER BY time DESC limit 1"
result := &InfluxDbQueryResult{}
headers := map[string]string{
"Authorization": "Token " + DBConfig.InfluxUser + ":" + DBConfig.InfluxPass,
}
//body := utils.UrlEncode()
//body := url.QueryEscape(fmt.Sprintf("db=%s&q=%s", db, query))
body := ""
respStr, queryErr := common.HttpPost(DBConfig.InfluxUrl+"/query?db="+db+"&q="+url.QueryEscape(query), "", headers, body)
//url string, contentType string, headers map[string]string, body string) (content string, err error) {
if queryErr == nil && !common.IsEmpty(respStr) {
parseErr := json.Unmarshal([]byte(respStr), result)
if parseErr != nil {
logs.Error(parseErr)
}
}
return result, queryErr
}
// timeRangeStart 如:start: -1h
func QueryInfluxData(databaseName string, retentionPolicy string, timeRange string) {
client := *GetInfluxClient()
// Get query client. influx 1.x Org name is not used
queryApi := client.QueryApi("")
// Supply string in a form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
result, err := queryApi.Query(context.Background(), `from(bucket:"`+fmt.Sprintf("%s/%s", databaseName, retentionPolicy)+`")|> range(`+
timeRange+`) |> filter(fn: (r) => r._measurement == "snmp")`)
if err == nil {
for result.Next() {
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("row: %s\n", result.Record().String())
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
fmt.Printf("Query error: %s\n", err.Error())
}
}
func QueryInfluxArrays(db string, query string) ([]map[string]interface{}, error) {
//curl -XPOST 'http://localhost:8086/query?pretty=true'
//-H 'Authorization: Token admin:nfdas@576576'
//-d "db=NFDAS_CACHE&q=select MEAN(Inbound) as InAvg,MEAN(Outbound) as OutAvg from snmp where "LinkId"='15' group by time(5m) ORDER BY time DESC limit 1"
var respMaps []map[string]interface{}
var columnsArr []string
var valuesArr [][]interface{}
result := &InfluxDbQueryResult{}
headers := map[string]string{
"Authorization": "Token " + DBConfig.InfluxUser + ":" + DBConfig.InfluxPass,
}
//body := utils.UrlEncode()
//body := url.QueryEscape(fmt.Sprintf("db=%s&q=%s", db, query))
body := ""
respStr, err := common.HttpPost(DBConfig.InfluxUrl+"/query?db="+db+"&q="+url.QueryEscape(query), "", headers, body)
if err != nil {
return respMaps, err
}
if respStr == "" {
err = errors.New("influx query data is null")
return respMaps, err
}
err = json.Unmarshal([]byte(respStr), result)
if err != nil {
logs.Error(err)
return respMaps, err
}
if len(result.Results) > 0 && len(result.Results[0].Series) > 0 {
//获取列表字段数组
columnsArr = result.Results[0].Series[0].Columns
if len(columnsArr) <= 0 {
err = errors.New("influx query data columns is null")
return respMaps, err
}
//遍历字段数组,存储字段名称和索引为map
//获取列表字段值二维数组
valuesArr = result.Results[0].Series[0].Values
if len(valuesArr) <= 0 {
err = errors.New("influx query data is null")
return respMaps, err
}
for _, valueArr := range valuesArr {
var respMap = make(map[string]interface{})
for index, col := range columnsArr {
respMap[col] = valueArr[index]
}
respMaps = append(respMaps, respMap)
}
}
return respMaps, err
}
网友评论