美文网首页
mimir查询主流程源码

mimir查询主流程源码

作者: wwq2020 | 来源:发表于2022-04-06 19:59 被阅读0次

    frontend http入口在pkg/api/api.go中

    func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handler) {
      ...
      a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query"), handler, true, true, "GET", "POST")
      ...
    }
    

    frontend会访问到pkg/frontend/transport/handler.go中

    func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
      ...
      resp, err := f.roundTripper.RoundTrip(r)
      ...
    }
    

    frontend调用到pkg/frontend/transport/roundtripper.go中

    func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
      ...
    
      resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
      if err != nil {
        return nil, err
      }
      ...
    }
    

    frontend调用到pkg/frontend/v2/frontend.go中

    func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
      ...
    case f.requestsCh <- freq:
      ...
    case resp := <-freq.response:
            if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
                stats := stats.FromContext(ctx)
                stats.Merge(resp.Stats) // Safe if stats is nil.
            }
    
            return resp.HttpResponse, nil
        }
      ...
    }
    

    frontend通过chan发送到pkg/frontend/v2/frontend_scheduler_worker.go中

    func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) error {
      ...
    
            case req := <-w.requestCh:
                err := loop.Send(&schedulerpb.FrontendToScheduler{
                    Type:            schedulerpb.ENQUEUE,
                    QueryID:         req.queryID,
                    UserID:          req.userID,
                    HttpRequest:     req.request,
                    FrontendAddress: w.frontendAddr,
                    StatsEnabled:    req.statsEnabled,
                })
      ...
    }
    

    scheduler通过grpc链接接受到查询请求pkg/scheduler/scheduler.go中

    func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
      ...
        case schedulerpb.ENQUEUE:
                err = s.enqueueRequest(frontendCtx, frontendAddress, msg)
      ...
    }
    

    scheduler推送给querier,pkg/scheduler/scheduler.go中

    func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer) error {
      ...
            req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID)
            if err != nil {
                return err
            }
      ...
            if err := s.forwardRequestToQuerier(querier, r); err != nil {
                return err
            }
      ...
    }
    
    
    func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error {
      ...
        go func() {
            err := querier.Send(&schedulerpb.SchedulerToQuerier{
                UserID:          req.userID,
                QueryID:         req.queryID,
                FrontendAddress: req.frontendAddress,
                HttpRequest:     req.request,
                StatsEnabled:    req.statsEnabled,
            })
            if err != nil {
                errCh <- err
                return
            }
    
            _, err = querier.Recv()
            errCh <- err
        }()
      ...
    }
    

    querier通过grpc接受到查询请求pkg/querier/worker/scheduler_processor.go中

    func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
      ...
            c, err := schedulerClient.QuerierLoop(ctx)
      ...
    
            if err := sp.querierLoop(c, address); err != nil {
                level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
                backoff.Wait()
                continue
            }
      ...
        }
    }
    
    func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error {
      ...
      sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
    
      ...
    }
    
    
    

    querier通过grpc发送查询响应给frontend pkg/querier/worker/scheduler_processor.go中

    func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
      ...
    c, err := sp.frontendPool.GetClientFor(frontendAddress)
        if err == nil {
            // Response is empty and uninteresting.
            _, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
                QueryID:      queryID,
                HttpResponse: response,
                Stats:        stats,
            })
        }
      ...
    }
    

    通过grpc获取到查询响应,pkg/frontend/v2/frontend.go

    func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
      ...
        req := f.requests.get(qrReq.QueryID)
      ...
            case req.response <- qrReq:
      ...
    }
    

    相关文章

      网友评论

          本文标题:mimir查询主流程源码

          本文链接:https://www.haomeiwen.com/subject/izhdsrtx.html