美文网首页
聊聊cortex的Ingester

聊聊cortex的Ingester

作者: go4it | 来源:发表于2021-01-24 23:46 被阅读0次

    本文主要研究一下cortex的Ingester

    Ingester

    cortex/pkg/api/api.go

    // Ingester is defined as an interface to allow for alternative implementations
    // of ingesters to be passed into the API.RegisterIngester() method.
    type Ingester interface {
        client.IngesterServer
        FlushHandler(http.ResponseWriter, *http.Request)
        ShutdownHandler(http.ResponseWriter, *http.Request)
        Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error)
    }
    

    Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法

    client.IngesterServer

    cortex/pkg/ingester/client/cortex.pb.go

    // IngesterServer is the server API for Ingester service.
    type IngesterServer interface {
        Push(context.Context, *WriteRequest) (*WriteResponse, error)
        Query(context.Context, *QueryRequest) (*QueryResponse, error)
        QueryStream(*QueryRequest, Ingester_QueryStreamServer) error
        LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error)
        LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error)
        UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error)
        AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error)
        MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error)
        MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error)
        // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
        TransferChunks(Ingester_TransferChunksServer) error
    }
    

    client.IngesterServer接口定义了Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks方法

    Push

    cortex/pkg/ingester/ingester.go

    // Push implements client.IngesterServer
    func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
        if err := i.checkRunningOrStopping(); err != nil {
            return nil, err
        }
    
        if i.cfg.BlocksStorageEnabled {
            return i.v2Push(ctx, req)
        }
    
        // NOTE: because we use `unsafe` in deserialisation, we must not
        // retain anything from `req` past the call to ReuseSlice
        defer client.ReuseSlice(req.Timeseries)
    
        userID, err := tenant.TenantID(ctx)
        if err != nil {
            return nil, fmt.Errorf("no user id")
        }
    
        // Given metadata is a best-effort approach, and we don't halt on errors
        // process it before samples. Otherwise, we risk returning an error before ingestion.
        i.pushMetadata(ctx, userID, req.GetMetadata())
    
        var firstPartialErr *validationError
        var record *WALRecord
        if i.cfg.WALConfig.WALEnabled {
            record = recordPool.Get().(*WALRecord)
            record.UserID = userID
            // Assuming there is not much churn in most cases, there is no use
            // keeping the record.Labels slice hanging around.
            record.Series = nil
            if cap(record.Samples) < len(req.Timeseries) {
                record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))
            } else {
                record.Samples = record.Samples[:0]
            }
        }
    
        for _, ts := range req.Timeseries {
            seriesSamplesIngested := 0
            for _, s := range ts.Samples {
                // append() copies the memory in `ts.Labels` except on the error path
                err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
                if err == nil {
                    seriesSamplesIngested++
                    continue
                }
    
                i.metrics.ingestedSamplesFail.Inc()
                if ve, ok := err.(*validationError); ok {
                    if firstPartialErr == nil {
                        firstPartialErr = ve
                    }
                    continue
                }
    
                // non-validation error: abandon this request
                return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
            }
    
            if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 {
                // updateActiveSeries will copy labels if necessary.
                i.updateActiveSeries(userID, time.Now(), ts.Labels)
            }
        }
    
        if record != nil {
            // Log the record only if there was no error in ingestion.
            if err := i.wal.Log(record); err != nil {
                return nil, err
            }
            recordPool.Put(record)
        }
    
        if firstPartialErr != nil {
            // grpcForwardableError turns the error into a string so it no longer references `req`
            return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
        }
    
        return &client.WriteResponse{}, nil
    }
    

    Push方法首先执行checkRunningOrStopping,若i.cfg.BlocksStorageEnabled则执行i.v2Push(ctx, req);否则遍历req.Timeseries执行i.append

    FlushHandler

    cortex/pkg/ingester/flush.go

    // FlushHandler triggers a flush of all in memory chunks.  Mainly used for
    // local testing.
    func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
        if i.cfg.BlocksStorageEnabled {
            i.v2FlushHandler(w, r)
            return
        }
    
        level.Info(util.Logger).Log("msg", "starting to flush all the chunks")
        i.sweepUsers(true)
        level.Info(util.Logger).Log("msg", "chunks queued for flushing")
        w.WriteHeader(http.StatusNoContent)
    }
    

    FlushHandler方法在i.cfg.BlocksStorageEnabled为true时执行i.v2FlushHandler(w, r)

    ShutdownHandler

    cortex/pkg/ingester/ingester.go

    // ShutdownHandler triggers the following set of operations in order:
    //     * Change the state of ring to stop accepting writes.
    //     * Flush all the chunks.
    func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
        originalFlush := i.lifecycler.FlushOnShutdown()
        // We want to flush the chunks if transfer fails irrespective of original flag.
        i.lifecycler.SetFlushOnShutdown(true)
    
        // In the case of an HTTP shutdown, we want to unregister no matter what.
        originalUnregister := i.lifecycler.ShouldUnregisterOnShutdown()
        i.lifecycler.SetUnregisterOnShutdown(true)
    
        _ = services.StopAndAwaitTerminated(context.Background(), i)
        // Set state back to original.
        i.lifecycler.SetFlushOnShutdown(originalFlush)
        i.lifecycler.SetUnregisterOnShutdown(originalUnregister)
    
        w.WriteHeader(http.StatusNoContent)
    }
    

    ShutdownHandler方法执行i.lifecycler.FlushOnShutdown()、i.lifecycler.ShouldUnregisterOnShutdown()以及services.StopAndAwaitTerminated

    Query

    cortex/pkg/ingester/ingester.go

    // Query implements service.IngesterServer
    func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
        if err := i.checkRunningOrStopping(); err != nil {
            return nil, err
        }
    
        if i.cfg.BlocksStorageEnabled {
            return i.v2Query(ctx, req)
        }
    
        userID, err := tenant.TenantID(ctx)
        if err != nil {
            return nil, err
        }
    
        from, through, matchers, err := client.FromQueryRequest(req)
        if err != nil {
            return nil, err
        }
    
        i.metrics.queries.Inc()
    
        i.userStatesMtx.RLock()
        state, ok, err := i.userStates.getViaContext(ctx)
        i.userStatesMtx.RUnlock()
        if err != nil {
            return nil, err
        } else if !ok {
            return &client.QueryResponse{}, nil
        }
    
        result := &client.QueryResponse{}
        numSeries, numSamples := 0, 0
        maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID)
        err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
            values, err := series.samplesForRange(from, through)
            if err != nil {
                return err
            }
            if len(values) == 0 {
                return nil
            }
            numSeries++
    
            numSamples += len(values)
            if numSamples > maxSamplesPerQuery {
                return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of samples in a query (%d)", maxSamplesPerQuery)
            }
    
            ts := client.TimeSeries{
                Labels:  client.FromLabelsToLabelAdapters(series.metric),
                Samples: make([]client.Sample, 0, len(values)),
            }
            for _, s := range values {
                ts.Samples = append(ts.Samples, client.Sample{
                    Value:       float64(s.Value),
                    TimestampMs: int64(s.Timestamp),
                })
            }
            result.Timeseries = append(result.Timeseries, ts)
            return nil
        }, nil, 0)
        i.metrics.queriedSeries.Observe(float64(numSeries))
        i.metrics.queriedSamples.Observe(float64(numSamples))
        return result, err
    }
    

    Query方法先判断i.checkRunningOrStopping();若i.cfg.BlocksStorageEnabled则执行i.v2Query(ctx, req);否则通过series.samplesForRange(from, through)获取数据

    小结

    cortex的Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法;Ingester实现了Ingester接口。

    doc

    相关文章

      网友评论

          本文标题:聊聊cortex的Ingester

          本文链接:https://www.haomeiwen.com/subject/chovzktx.html