美文网首页
聊聊loki的Query

聊聊loki的Query

作者: go4it | 来源:发表于2021-01-28 23:29 被阅读0次

    本文主要研究一下loki的Query

    Query

    loki/pkg/logql/engine.go

    // Query is a LogQL query to be executed.
    type Query interface {
        // Exec processes the query.
        Exec(ctx context.Context) (Result, error)
    }
    
    // Result is the result of a query execution.
    type Result struct {
        Data       promql_parser.Value
        Statistics stats.Result
    }
    

    Query接口定义了Exec方法,返回Result;Result定义了Data、Statistics属性

    Exec

    loki/pkg/logql/engine.go

    // Exec Implements `Query`. It handles instrumentation & defers to Eval.
    func (q *query) Exec(ctx context.Context) (Result, error) {
        log, ctx := spanlogger.New(ctx, "query.Exec")
        defer log.Finish()
    
        rangeType := GetRangeType(q.params)
        timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
        defer timer.ObserveDuration()
    
        // records query statistics
        var statResult stats.Result
        start := time.Now()
        ctx = stats.NewContext(ctx)
    
        data, err := q.Eval(ctx)
    
        statResult = stats.Snapshot(ctx, time.Since(start))
        statResult.Log(level.Debug(log))
    
        status := "200"
        if err != nil {
            status = "500"
            if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) {
                status = "400"
            }
        }
    
        if q.record {
            RecordMetrics(ctx, q.params, status, statResult)
        }
    
        return Result{
            Data:       data,
            Statistics: statResult,
        }, err
    }
    

    Exec方法执行q.Eval(ctx)及stats.Snapshot

    Eval

    loki/pkg/logql/engine.go

    func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
        ctx, cancel := context.WithTimeout(ctx, q.timeout)
        defer cancel()
    
        expr, err := q.parse(ctx, q.params.Query())
        if err != nil {
            return nil, err
        }
    
        switch e := expr.(type) {
        case SampleExpr:
            value, err := q.evalSample(ctx, e)
            return value, err
    
        case LogSelectorExpr:
            iter, err := q.evaluator.Iterator(ctx, e, q.params)
            if err != nil {
                return nil, err
            }
    
            defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
            streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
            return streams, err
        default:
            return nil, errors.New("Unexpected type (%T): cannot evaluate")
        }
    }
    

    Eval方法执行q.parse解析为Expr,之后根据Expr的类型做不同处理,如果是SampleExpr类型执行q.evalSample;如果是LogSelectorExpr类型则执行q.evaluator.Iterator

    Snapshot

    loki/pkg/logql/stats/context.go

    func Snapshot(ctx context.Context, execTime time.Duration) Result {
        // ingester data is decoded from grpc trailers.
        res := decodeTrailers(ctx)
        // collect data from store.
        s, ok := ctx.Value(storeKey).(*StoreData)
        if ok {
            res.Store.TotalChunksRef = s.TotalChunksRef
            res.Store.TotalChunksDownloaded = s.TotalChunksDownloaded
            res.Store.ChunksDownloadTime = s.ChunksDownloadTime.Seconds()
        }
        // collect data from chunks iteration.
        c, ok := ctx.Value(chunksKey).(*ChunkData)
        if ok {
            res.Store.HeadChunkBytes = c.HeadChunkBytes
            res.Store.HeadChunkLines = c.HeadChunkLines
            res.Store.DecompressedBytes = c.DecompressedBytes
            res.Store.DecompressedLines = c.DecompressedLines
            res.Store.CompressedBytes = c.CompressedBytes
            res.Store.TotalDuplicates = c.TotalDuplicates
        }
    
        existing, err := GetResult(ctx)
        if err != nil {
            res.ComputeSummary(execTime)
            return res
        }
    
        existing.Merge(res)
        existing.ComputeSummary(execTime)
        return *existing
    
    }
    

    Snapshot方法从ctx.Value取出StoreData及ChunkData计算res,然后再取出Result,进行Merge及ComputeSummary

    小结

    loki的Query接口定义了Exec方法,返回Result;Result定义了Data、Statistics属性;query实现了Query接口,其Exec方法执行q.Eval(ctx)及stats.Snapshot。

    doc

    相关文章

      网友评论

          本文标题:聊聊loki的Query

          本文链接:https://www.haomeiwen.com/subject/tfeftltx.html