frontend http入口在pkg/api/api.go中
func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handler) {
...
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query"), handler, true, true, "GET", "POST")
...
}
frontend会访问到pkg/frontend/transport/handler.go中
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
...
resp, err := f.roundTripper.RoundTrip(r)
...
}
frontend调用到pkg/frontend/transport/roundtripper.go中
func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
...
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
if err != nil {
return nil, err
}
...
}
frontend调用到pkg/frontend/v2/frontend.go中
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
...
case f.requestsCh <- freq:
...
case resp := <-freq.response:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
}
return resp.HttpResponse, nil
}
...
}
frontend通过chan发送到pkg/frontend/v2/frontend_scheduler_worker.go中
func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) error {
...
case req := <-w.requestCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
UserID: req.userID,
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
...
}
scheduler通过grpc链接接受到查询请求pkg/scheduler/scheduler.go中
func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
...
case schedulerpb.ENQUEUE:
err = s.enqueueRequest(frontendCtx, frontendAddress, msg)
...
}
scheduler推送给querier,pkg/scheduler/scheduler.go中
func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer) error {
...
req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID)
if err != nil {
return err
}
...
if err := s.forwardRequestToQuerier(querier, r); err != nil {
return err
}
...
}
func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error {
...
go func() {
err := querier.Send(&schedulerpb.SchedulerToQuerier{
UserID: req.userID,
QueryID: req.queryID,
FrontendAddress: req.frontendAddress,
HttpRequest: req.request,
StatsEnabled: req.statsEnabled,
})
if err != nil {
errCh <- err
return
}
_, err = querier.Recv()
errCh <- err
}()
...
}
querier通过grpc接受到查询请求pkg/querier/worker/scheduler_processor.go中
func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
...
c, err := schedulerClient.QuerierLoop(ctx)
...
if err := sp.querierLoop(c, address); err != nil {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
backoff.Wait()
continue
}
...
}
}
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error {
...
sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
...
}
querier通过grpc发送查询响应给frontend pkg/querier/worker/scheduler_processor.go中
func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
...
c, err := sp.frontendPool.GetClientFor(frontendAddress)
if err == nil {
// Response is empty and uninteresting.
_, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
HttpResponse: response,
Stats: stats,
})
}
...
}
通过grpc获取到查询响应,pkg/frontend/v2/frontend.go
func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
...
req := f.requests.get(qrReq.QueryID)
...
case req.response <- qrReq:
...
}
网友评论