美文网首页
beyla源码简单分析

beyla源码简单分析

作者: wwq2020 | 来源:发表于2024-01-14 16:13 被阅读0次

    背景

    beyla是一个基于ebpf的http/https服务的自动instrumentation的工具

    这边源码分析以go的net/http.RoundTrip举例

    源码

    ringbuffer writer->ringbuff reader/span writer->span reader/trace reporter/metric reporter

    ringbuffer writer

    events的ringbuffer
    bpf/ringbuf.h

    struct {
        __uint(type, BPF_MAP_TYPE_RINGBUF);
        __uint(max_entries, 1 << 24);
    } events SEC(".maps");
    

    写入ringbuffer
    bpf/go_nethttp.c中

    请求的bpf map
    struct {
        __uint(type, BPF_MAP_TYPE_HASH);
        __type(key, void *); // key: pointer to the request goroutine
        __type(value, http_func_invocation_t);
        __uint(max_entries, MAX_CONCURRENT_REQUESTS);
    } ongoing_http_client_requests SEC(".maps");
    
    
    
    roundtrip开始
    SEC("uprobe/roundTrip")
    int uprobe_roundTrip(struct pt_regs *ctx) {
    ...
    读取goroutine地址
        void *goroutine_addr = GOROUTINE_PTR(ctx);
    ...
    读取参数
        void *req = GO_PARAM2(ctx);
    ...
    记录请求开始时间参数等
        http_func_invocation_t invocation = {
            .start_monotime_ns = bpf_ktime_get_ns(),
            .req_ptr = (u64)req,
            .tp = {0}
        };
    ...
    写入请求相关信息到bpf map
        if (bpf_map_update_elem(&ongoing_http_client_requests, &goroutine_addr, &invocation, BPF_ANY)) {
            bpf_dbg_printk("can't update http client map element");
        }
    ...
        return 0;
    }
    
    SEC("uprobe/roundTrip_return")
    int uprobe_roundTripReturn(struct pt_regs *ctx) {
    ...
        void *goroutine_addr = GOROUTINE_PTR(ctx);
    ...
    从bpf map中读取请求相关信息
        http_func_invocation_t *invocation =
            bpf_map_lookup_elem(&ongoing_http_client_requests, &goroutine_addr);
    ...
    ringbuffer预留http_request_trace
        http_request_trace *trace = bpf_ringbuf_reserve(&events, sizeof(http_request_trace), 0);
    ...
        trace->tp = invocation->tp;
    ...
        写入http_request_trace到ringbuffer
        bpf_ringbuf_submit(trace, get_flags());
    ...
    }
    

    ringbuffer reader/span writer

    运行tracer
    pkg/internal/ebpf/nethttp/nethttp.go中

    运行tracer
    func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span, service svc.ID) {
        ebpfcommon.ForwardRingbuf[ebpfcommon.HTTPRequestTrace](
            service,
            p.cfg, p.log, p.bpfObjects.Events,
            ebpfcommon.ReadHTTPRequestTraceAsSpan,
            p.pidsFilter.Filter,
            p.metrics,
            append(p.closers, &p.bpfObjects)...,
        )(ctx, eventsChan)
    }
    

    读取ringbuffer转发
    pkg/internal/ebpf/common/ringbuf.go中

    reader工厂方法
    var readerFactory = func(rb *ebpf.Map) (ringBufReader, error) {
        return ringbuf.NewReader(rb)
    }
    
    转发ringbuffer
    func ForwardRingbuf[T any](
        service svc.ID,
        cfg *TracerConfig,
        logger *slog.Logger,
        ringbuffer *ebpf.Map,
        reader func(*ringbuf.Record) (request.Span, bool, error),
        filter func([]request.Span) []request.Span,
        metrics imetrics.Reporter,
        closers ...io.Closer,
    ) func(context.Context, chan<- []request.Span) {
        rbf := ringBufForwarder[T]{
            service: service, cfg: cfg, logger: logger, ringbuffer: ringbuffer,
            closers: closers, reader: reader, filter: filter, metrics: metrics,
        }
        return rbf.readAndForward
    }
    
    读取并转发
    func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, spansChan chan<- []request.Span) {
    ...
        for {
            读取events
    ...
    处理并转发ringbuffer.Record
            rbf.processAndForward(record, spansChan)
    ...
            record, err = eventsReader.Read()
        }
    
    }
    
    func (rbf *ringBufForwarder[T]) processAndForward(record ringbuf.Record, spansChan chan<- []request.Span) {
    ...
    ringbuffer.Record转换成request.Span
        s, ignore, err := rbf.reader(&record)
    ...
    flush request.Span
            rbf.flushEvents(spansChan)
    ...
    }
    
    
    func (rbf *ringBufForwarder[T]) flushEvents(spansChan chan<- []request.Span) {
    ...
    发送指标
        rbf.metrics.TracerFlush(rbf.spansLen)
    ...
    过滤后发送给request.Span reader
        spansChan <- rbf.filter(rbf.spans[:rbf.spansLen])
    ...
    }
    

    pkg/internal/ebpf/common/common.go中

    func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, bool, error) {
    ...
    读取HTTPRequestTrace
        var event HTTPRequestTrace
    
        err = binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
        if err != nil {
            return request.Span{}, true, err
        }
    ...
    HTTPRequestTrace转span
        return HTTPRequestTraceToSpan(&event), false, nil
    }
    

    HTTPRequestTrace转request.Span
    pkg/internal/ebpf/common/spanner.go

    func HTTPRequestTraceToSpan(trace *HTTPRequestTrace) request.Span {
    ...
        return request.Span{
    ...
        }
    }
    

    span reader/metric reporter/trace reporter

    pkg/internal/pipe/instrumenter.go

    func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) (*Instrumenter, error) {
        if err := config.Validate(); err != nil {
            return nil, fmt.Errorf("validating configuration: %w", err)
        }
    
        return newGraphBuilder(ctx, config, ctxInfo, tracesCh).buildGraph()
    }
    
    func newGraphBuilder(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) *graphFunctions {
    ...
        graph.RegisterTerminal(gnb, gb.metricsReporterProvider)
        graph.RegisterTerminal(gnb, gb.tracesReporterProvider)
    ...
    }
    
    otel trace adapter
    func (gb *graphFunctions) tracesReporterProvider(config otel.TracesConfig) (node.TerminalFunc[[]request.Span], error) {
        return otel.ReportTraces(gb.ctx, &config, gb.ctxInfo)
    }
    
    otel metric adapter
    func (gb *graphFunctions) metricsReporterProvider(config otel.MetricsConfig) (node.TerminalFunc[[]request.Span], error) {
        return otel.ReportMetrics(gb.ctx, &config, gb.ctxInfo)
    }
    

    otel trace reporter
    pkg/internal/export/otel/traces.go中

    构建reporter
    func ReportTraces(ctx context.Context, cfg *TracesConfig, ctxInfo *global.ContextInfo) (node.TerminalFunc[[]request.Span], error) {
    ...
        return tr.reportTraces, nil
    }
    
    reporter方法
    func (r *TracesReporter) reportTraces(input <-chan []request.Span) {
    ...
        for spans := range input {
    ...
    构建otel span
                r.makeSpan(r.ctx, reporter, span)
    ...
        }
    }
    
    func (r *TracesReporter) makeSpan(parentCtx context.Context, tracer trace2.Tracer, span *request.Span) {
    ...
    构建span
        ctx, sp := tracer.Start(parentCtx, traceName(span),
            trace2.WithTimestamp(realStart),
            trace2.WithSpanKind(spanKind(span)),
            trace2.WithAttributes(r.traceAttributes(span)...),
        )
    
        sp.SetStatus(spanStatusCode(span), "")
    ...
    span结束
        sp.End(trace2.WithTimestamp(t.End))
    ...
    }
    

    otel metrics reporter
    pkg/internal/export/otel/metrics.go中

    构建reporter
    func ReportMetrics(
        ctx context.Context, cfg *MetricsConfig, ctxInfo *global.ContextInfo,
    ) (node.TerminalFunc[[]request.Span], error) {
    ...
        return mr.reportMetrics, nil
    }
    
    func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
    ...
        for spans := range input {
    ...
    记录指标
                reporter.record(s, mr.metricAttributes(s))
    ...
        }
    ...
    }
    
    根据span类型记录指标
    func (r *Metrics) record(span *request.Span, attrs attribute.Set) {
        t := span.Timings()
        duration := t.End.Sub(t.RequestStart).Seconds()
        attrOpt := instrument.WithAttributeSet(attrs)
        switch span.Type {
        case request.EventTypeHTTP:
            // TODO: for more accuracy, there must be a way to set the metric time from the actual span end time
            r.httpDuration.Record(r.ctx, duration, attrOpt)
            r.httpRequestSize.Record(r.ctx, float64(span.ContentLength), attrOpt)
        case request.EventTypeGRPC:
            r.grpcDuration.Record(r.ctx, duration, attrOpt)
        case request.EventTypeGRPCClient:
            r.grpcClientDuration.Record(r.ctx, duration, attrOpt)
        case request.EventTypeHTTPClient:
            r.httpClientDuration.Record(r.ctx, duration, attrOpt)
            r.httpClientRequestSize.Record(r.ctx, float64(span.ContentLength), attrOpt)
        case request.EventTypeSQLClient:
            r.sqlClientDuration.Record(r.ctx, duration, attrOpt)
        }
    }
    

    入口相关

    cmd/beyla/main.go中

    func main() {
    ...
    读取转发指标
        if err := instr.ReadAndForward(ctx); err != nil {
            slog.Error("Beyla couldn't start read and forwarding", "error", err)
            os.Exit(-1)
        }
    ...
    }
    

    pkg/beyla/beyla.go中

    func (i *Instrumenter) ReadAndForward(ctx context.Context) error {
    构建读取转发pipeline
        bp, err := pipe.Build(ctx, i.config, i.ctxInfo, i.tracesInput)
        if err != nil {
            return fmt.Errorf("can't instantiate instrumentation pipeline: %w", err)
        }
    }
    

    相关文章

      网友评论

          本文标题:beyla源码简单分析

          本文链接:https://www.haomeiwen.com/subject/nqhcodtx.html