美文网首页
Prometheus数据存储

Prometheus数据存储

作者: 酱油王0901 | 来源:发表于2019-10-22 00:46 被阅读0次

To be continued....

Prometheus提供了两种存储方式,分别为本地存储和远端存储。

Prometheus的数据写入接口为Appender,源文件tsdb/db.go

type Appender interface {
    // Add adds a sample pair for the given series. A reference number is
    // returned which can be used to add further samples in the same or later
    // transactions.
    // Returned reference numbers are ephemeral and may be rejected in calls
    // to AddFast() at any point. Adding the sample via Add() returns a new
    // reference number.
    // If the reference is 0 it must not be used for caching.
    Add(l labels.Labels, t int64, v float64) (uint64, error)

    // AddFast adds a sample pair for the referenced series. It is generally
    // faster than adding a sample by providing its full label set.
    AddFast(ref uint64, t int64, v float64) error

    // Commit submits the collected samples and purges the batch.
    Commit() error

    // Rollback rolls back all modifications made in the appender so far.
    Rollback() error
}

初始化

Prometheus支持本地存储和远端存储,初始化过程如下:

var (
     localStorage  = &tsdb.ReadyStorage{}
     remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.    StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
     fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

首先分别创建localremote的storage instance;然后根据instances创建Fanout storage。其中需要注意的是local storage还没有与actual storage关联,后期TSDB加载完成后将其关联在TSDB上,此过程通过ReadyStorage.Set方法来完成。

db, err := tsdb.Open(
                cfg.localStoragePath,
                log.With(logger, "component", "tsdb"),
                prometheus.DefaultRegisterer,
                &cfg.tsdb,
            )
......
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
                localStorage.Set(db, startTimeMargin)

为了兼容本地存储和远端存储,Prometheus提供了fanout类,fanout实现了Storage接口,fanout的结构如下:

type fanout struct {
    logger log.Logger

    primary     Storage
    secondaries []Storage
}

当执行fanout中的方法(例如Add)时,fanout会先执行本地存储(Primary)的Add方法,然后遍历执行每个远端存储(secondaries)的Add的方法。

其中Storage接口定义在源文件storage/interface.go

// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
    Queryable

    // StartTime returns the oldest timestamp stored in the storage.
    StartTime() (int64, error)

    // Appender returns a new appender against the storage.
    Appender() (Appender, error)

    // Close closes the storage and all its underlying resources.
    Close() error
}

Appendable接口定义了数据能被append到哪个entity,具体定义在源文件tsdb/block.go中。

// Appendable defines an entity to which data can be appended.
type Appendable interface {
    // Appender returns a new Appender against an underlying store.
    Appender() Appender
}

数据的执行流程如下所示:


其中ReadyStorage, fanout, DB, Head分别都实现了Appendable接口。数据首先写到head block

数据抓取

数据抓取的过程是scrapeLoop中的scraper实例周期性地执行抓取操作,将抓取的数据先保存在buf中,具体实现在源文件scrape/scrape.go中。

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
    ......
   
    b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
    buf := bytes.NewBuffer(b)

    // 抓取数据
    contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
    
    ......
    
    // A failed scrape is the same as an empty scrape,
    // we still call sl.append to trigger stale markers.
    // 调用scrapeLoop的append方法,将抓取的数据进行解析并存储,具体请见下面分析
    // b为buf的字符串表述,start为当前时间
    total, added, seriesAdded, appErr := sl.append(b, contentType, start)
    if appErr != nil {
        level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
        // The append failed, probably due to a parse error or sample limit.
        // Call sl.append again with an empty scrape to trigger stale markers.
        if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
            level.Warn(sl.l).Log("msg", "append failed", "err", err)
        }
    }

    sl.buffers.Put(b)

// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
    scrape(ctx context.Context, w io.Writer) (string, error)
    report(start time.Time, dur time.Duration, err error)
    offset(interval time.Duration, jitterSeed uint64) time.Duration
}

抓取的数据格式为:

# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 2.3944e-05
go_gc_duration_seconds{quantile="0.25"} 4.9177e-05
go_gc_duration_seconds{quantile="0.5"} 0.000121297
go_gc_duration_seconds{quantile="0.75"} 0.000319643
go_gc_duration_seconds{quantile="1"} 0.027424647
go_gc_duration_seconds_sum 2029.637676688
go_gc_duration_seconds_count 17095
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 98

scrapeLoop

scrapeLoop的数据结构如下:

type scrapeLoop struct {
    scraper         scraper
    cache           *scrapeCache
    lastScrapeSize  int
    buffers         *pool.Pool

    appender            func() storage.Appender
    .....
}

scrapeLoopscrapePool构造,其中:

  • appender即为scrapePool中的appendable.Appender(), 在源文件scrape/scrape.go
    func() storage.Appender {
        app, err := app.Appender()
        if err != nil {
            panic(err)
        }
        return appender(app, opts.limit)
    }
    
  • scraper作为scrapeLoopOptions的field,其为targetScraper的实例。targetScraper实现了上述的scraper接口,其通过http定期从target pull数据。

scrapePool的数据结构如下:

type scrapePool struct {
    appendable Appendable
    logger     log.Logger

    mtx    sync.RWMutex
    config *config.ScrapeConfig
    client *http.Client
    // Targets and loops must always be synchronized to have the same
    // set of hashes.
    activeTargets  map[uint64]*Target
    droppedTargets []*Target
    loops          map[uint64]loop
    cancel         context.CancelFunc

    // Constructor for new scrape loops. This is settable for testing convenience.
    newLoop func(scrapeLoopOptions) loop
}

scrapePool的作用是管理targets的scrape。其中:

  • activeTargets存储的是active targets信息。
  • loops存储的是对target求hash值之后对应的loop信息。
  • newLoop是一个scrapeLoop构造器,根据scrapeLoopOptions创建不同的loop。
  • appendable存储的是对应的Appender。

所有的scrapePool的信息都是由scrapeManager来维护的,而scrapeManager的初始化是在prometheus启动时传入相应的storage来完成的。

scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

scrapeManager的结构体定义在scrape/manager.go文件中。

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
    logger    log.Logger
    append    Appendable
    graceShut chan struct{}

    jitterSeed    uint64     // Global jitterSeed seed is used to spread scrape workload across HA setup.
    mtxScrape     sync.Mutex // Guards the fields below.
    scrapeConfigs map[string]*config.ScrapeConfig
    scrapePools   map[string]*scrapePool
    targetSets    map[string][]*targetgroup.Group

    triggerReload chan struct{}
}

scrapeManagerRun方法接受并保存从discovery manager scrape发送过来的有变化的target set,并更新其target信息;然后调用reload方法创建新的scrapePool或者

func (m *Manager) reload() {
    m.mtxScrape.Lock()
    var wg sync.WaitGroup
    for setName, groups := range m.targetSets {
        if _, ok := m.scrapePools[setName]; !ok {
            scrapeConfig, ok := m.scrapeConfigs[setName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
                continue
            }
            // 创建新的scrapePool,其中append即为fanoutStorage
            sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
            if err != nil {
                level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
                continue
            }
            m.scrapePools[setName] = sp
        }

        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {
            // Sync将target group转化为实际的scrape target
            sp.Sync(groups)
            wg.Done()
        }(m.scrapePools[setName], groups)

    }
    m.mtxScrape.Unlock()
    wg.Wait()
}

数据存储过程

下面接着数据抓取部分的scrapeLoopappend方法进行讲解。

  • 首先对抓取到的数据进行解析并Add。PromParser依次遍历解析抓取的数据,如果读取到的数据为EntrySeries则先从scrapeLoop的缓存数据中读取是否存在此metric,如果存在则直接调用appender.AddFast方法。

    t := defTime
    met, tp, v := p.Series()
    if !sl.honorTimestamps {
        tp = nil
    }
    if tp != nil {
        t = *tp
    }
    
    if sl.cache.getDropped(yoloString(met)) {
        continue
    }
    ce, ok := sl.cache.get(yoloString(met))
    if ok {
        // 如果scrapeLoop的cache中存在metric信息
        switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
        ......
        }
    }
    if !ok {
        var lset labels.Labels
    
        // 将parser.series转化为Labels键值对
        mets := p.Metric(&lset)
        // 计算其hash值
        hash := lset.Hash()
    
        // Hash label set as it is seen local to the target. Then add target labels
        // and relabeling and store the final label set.
        lset = sl.sampleMutator(lset)
    
        // The label set may be set to nil to indicate dropping.
        if lset == nil {
            sl.cache.addDropped(mets)
            continue
        }
    
        var ref uint64
        ref, err = app.Add(lset, t, v)
        .....
    }
    .....
    if err != nil {
        app.Rollback()
        return total, added, seriesAdded, err
    }
    if err := app.Commit(); err != nil {
        return total, added, seriesAdded, err
    }
    .....
    
  • 要知道app.Add或者app.AddFast的具体实现方法,得先获取app代表的具体实例信息。可以先用dlv查看fanout的具体结构:

    (dlv) p f
    *github.com/prometheus/prometheus/storage.fanout {
        logger: ...,
        primary: github.com/prometheus/prometheus/storage.Storage(*github.com/prometheus/prometheus/storage/tsdb.ReadyStorage) *{
                mtx: (*sync.RWMutex)(0xc0000d8740),
                a: *github.com/prometheus/prometheus/storage/tsdb.adapter {
                            db: *github.com/prometheus/prometheus/tsdb.DB {
                                dir: "/opt/sds/prometheus/",
                                lockf: github.com/prometheus/prometheus/tsdb/fileutil.Releaser(*github.com/prometheus/prometheus/tsdb/fileutil.unixLock) ...,
                                logger: github.com/prometheus/prometheus/vendor/github.com/go-kit/kit/log.Logger(*github.com/prometheus/prometheus/vendor/github.com/go-kit/kit/log.context) ...,
                                metrics: *(*github.com/prometheus/prometheus/tsdb.dbMetrics)(0xc000519520),
                                opts: *(*github.com/prometheus/prometheus/tsdb.Options)(0xc0005d80c0),
                                chunkPool: github.com/prometheus/prometheus/tsdb/chunkenc.Pool(*github.com/prometheus/prometheus/tsdb/chunkenc.pool) ...,
                                compactor: github.com/prometheus/prometheus/tsdb.Compactor(*github.com/prometheus/prometheus/tsdb.LeveledCompactor) ...,
                                mtx: (*sync.RWMutex)(0xc0005194b0),
                                blocks: []*github.com/prometheus/prometheus/tsdb.Block len: 7, cap: 8, [
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042ca00),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042cb40),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042cc80),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042cdc0),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042d2c0),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042cf00),
                                    *(*github.com/prometheus/prometheus/tsdb.Block)(0xc00042d180),
                                ],
                                head: *(*github.com/prometheus/prometheus/tsdb.Head)(0xc0004705a0),
                                compactc: ...,
                                donec: ...,
                                stopc: ...,
        secondaries: []github.com/prometheus/prometheus/storage.Storage len: 1, cap: 1, [
            *github.com/prometheus/prometheus/storage/remote.Storage {
                logger:...,
                rws: *(*github.com/prometheus/prometheus/storage/remote.WriteStorage)(0xc0004421b0),
                queryables: []github.com/prometheus/prometheus/storage.Queryable len: 0, cap: 0, [],
                localStartTimeCallback: github.com/prometheus/prometheus/storage/tsdb.(*ReadyStorage).StartTime-fm,},
          ]
     }
    

    scrapeLoop.append()方法中的app的结构为:

    (dlv) p app
    github.com/prometheus/prometheus/storage.Appender(*github.com/prometheus/prometheus/scrape.timeLimitAppender) *{
        Appender: github.com/prometheus/prometheus/storage.Appender(*github.com/prometheus/prometheus/storage.fanoutAppender) *{
            logger: ...,
            primary: github.com/prometheus/prometheus/storage.Appender(github.com/prometheus/prometheus/storage/tsdb.appender) *(*"github.com/prometheus/prometheus/storage.Appender")(0xc000087d50),
            secondaries: []github.com/prometheus/prometheus/storage.Appender len: 1, cap: 1, [
                ...,
            ],},
        maxTime: 1571838823121,}
    
    (dlv) p app.Appender.primary
    github.com/prometheus/prometheus/storage.Appender(github.com/prometheus/prometheus/storage/tsdb.appender) {
        a: github.com/prometheus/prometheus/tsdb.Appender(github.com/prometheus/prometheus/tsdb.dbAppender) {
            Appender: github.com/prometheus/prometheus/tsdb.Appender(*github.com/prometheus/prometheus/tsdb.headAppender) ...,
            db: *(*github.com/prometheus/prometheus/tsdb.DB)(0xc0005ca1a0),},}
    (dlv) p app.Appender.secondaries
    []github.com/prometheus/prometheus/storage.Appender len: 1, cap: 1, [
        *github.com/prometheus/prometheus/storage/remote.timestampTracker {
            writeStorage: *(*github.com/prometheus/prometheus/storage/remote.WriteStorage)(0xc0003d99e0),
            samples: 0,
            highestTimestamp: 0,},
    ]
    
    func (f *fanout) Appender() (Appender, error) {
        primary, err := f.primary.Appender()
        if err != nil {
            return nil, err
        }
    
        secondaries := make([]Appender, 0, len(f.secondaries))
        for _, storage := range f.secondaries {
            appender, err := storage.Appender()
            if err != nil {
                return nil, err
            }
            secondaries = append(secondaries, appender)
        }
        return &fanoutAppender{
            logger:      f.logger,
            primary:     primary,
            secondaries: secondaries,
        }, nil
    }
    

    从上面以及前面的分析可以看出primary即为adapterAppender,具体见源文件storage/tsdb/tsdb.go

    // Appender returns a new appender against the storage.
    func (a adapter) Appender() (storage.Appender, error) {
        return appender{a: a.db.Appender()}, nil
    }
    

    进一步追溯,appender实现了storage.Appender接口,数据结构为:

    type appender struct {
        a tsdb.Appender
    }
    

    adapter关联的db的Appender返回的是:

    // Appender opens a new appender against the database.
    func (db *DB) Appender() Appender {
        return dbAppender{db: db, Appender: db.head.Appender()}
    }
    
    // dbAppender wraps the DB's head appender and triggers compactions on commit
    // if necessary.
    type dbAppender struct {
        Appender
        db *DB
    }
    

    db.head.Appender方法为:

    // Appender returns a new Appender on the database.
    func (h *Head) Appender() Appender {
        h.metrics.activeAppenders.Inc()
    
        // The head cache might not have a starting point yet. The init appender
        // picks up the first appended timestamp as the base.
        if h.MinTime() == math.MaxInt64 {
            return &initAppender{head: h}
        }
        return h.appender()
    }
    
    func (h *Head) appender() *headAppender {
        return &headAppender{
            head: h,
            // Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
            // This ensures that no samples will be added within the compaction window to avoid races.
            minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2),
            mint:         math.MaxInt64,
            maxt:         math.MinInt64,
            samples:      h.getAppendBuffer(),
        }
    }
    

    headAppender的数据结构为:

    type headAppender struct {
        head         *Head
        minValidTime int64 // No samples below this timestamp are allowed.
        mint, maxt   int64
    
        series  []RefSeries
        samples []RefSample
    }
    

    从上面的分析可以得出scrapeLoopapp.Add(lset, t, v)方法的调用过程如下:

    +----------------------------------------------------------------------------------------------------------+
    | timeLimitAppender.Add -> fanoutAppender.Add -> appender.Add -> dbAppender.Add ->  headAppender.Add
    +----------------------------------------------------------------------------------------------------------+
    | scrape/target.go -> storage/fanout.go -> storage/tsdb/tsdb.go -> tsdb/db.go -> tsdb/head.go
    +----------------------------------------------------------------------------------------------------------+
    

    app.AddFast也类似,不过有点困惑的是不确定为啥要定义两种类型的Appender接口,一个在storage package,另一个在tsdb package,而且两个接口定义的方法还略有不同。headAppender接口实现的是tsdb package中的Appender接口。所以数据是先往head block中写。

  • 数据Add后执行Commit操作,与前面Add流程类似,不过headAppender在执行完Commit操作后会判断db.head是否可压缩,如果可以通知执行。

    func (a dbAppender) Commit() error {
        err := a.Appender.Commit()
    
        // We could just run this check every few minutes practically. But for benchmarks
        // and high frequency use cases this is the safer way.
        if a.db.head.compactable() {
            select {
            case a.db.compactc <- struct{}{}:
            default:
            }
        }
        return err
    }
    

headAppender

由于数据先写入head block,因此有必要详细介绍head相关的源码。

type headAppender struct {
    head         *Head
    minValidTime int64 // No samples below this timestamp are allowed.
    mint, maxt   int64

    series  []RefSeries
    samples []RefSample
}

headAppender实现了tsdb的Appender接口。
其中headAppenderCommit方法实际上是调用其log方法将seriessamples分别写入WAL中。

func (a *headAppender) log() error {
    if a.head.wal == nil {
        return nil
    }

    buf := a.head.getBytesBuffer()
    defer func() { a.head.putBytesBuffer(buf) }()

    var rec []byte
    var enc RecordEncoder

    if len(a.series) > 0 {
        rec = enc.Series(a.series, buf)
        buf = rec[:0]

        if err := a.head.wal.Log(rec); err != nil {
            return errors.Wrap(err, "log series")
        }
    }
    if len(a.samples) > 0 {
        rec = enc.Samples(a.samples, buf)
        buf = rec[:0]

        if err := a.head.wal.Log(rec); err != nil {
            return errors.Wrap(err, "log samples")
        }
    }
    return nil
}

前面的文章Prometheus checkpoint中已经介绍过RecordDecoder相关的源码,现在详细介绍其encode过程,也就是如何将seriessamples encode成records

// RecordEncoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type RecordEncoder struct {
}

const (
    // RecordInvalid is returned for unrecognised WAL record types.
    RecordInvalid RecordType = 255
    // RecordSeries is used to match WAL records of type Series.
    RecordSeries RecordType = 1
    // RecordSamples is used to match WAL records of type Samples.
    RecordSamples RecordType = 2
    // RecordTombstones is used to match WAL records of type Tombstones.
    RecordTombstones RecordType = 3
)

下面来看一下RecordEncoder是如何将series encode成字节数组,也就是record的。

  • 首先写入一字节的类型
  • 遍历series,对于每一条series。
    • 写入8字节的ref
    • 写入变长的整型,代表labels的个数。
    • 接着分别写入labelNameValue
// Series appends the encoded series to b and returns the resulting slice.
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordSeries))

    for _, s := range series {
        buf.PutBE64(s.Ref)
        buf.PutUvarint(len(s.Labels))

        for _, l := range s.Labels {
            buf.PutUvarintStr(l.Name)
            buf.PutUvarintStr(l.Value)
        }
    }
    return buf.Get()
}

series record格式为:

+--------------------------------------+-------------------------------------+
|       Name                           |      Value                          |
+---------------------+----------------+----------------------+--------------+
|len(str_n) <uvarint> | str_n <bytes>  | len(str_n) <uvarint> | str_n <bytes>|
+---------------------+----------------+----------------------+--------------+

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| +---------------------------------------------+ |
| | ref <8b>                                    | |
| +---------------------------------------------+ |
| | len(lables) <uvarint>                       | |
| +----------------------+----------------------+ |
| |    +---------------+-------------------+    | |
| |    |     Name      |       Value       |    | |
| |    +---------------+-------------------+    | |
| |    |            . . .                  |    | |
| |    +---------------+-------------------+    | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

Samples方法将samples encode成sampels record并返回此slice。

  • 首先写入一字节的type
  • 接着写入第一个sample的reference numbertimestamp作为base。其分别占用8个字节。
  • 依次遍历每个samples,注意包含第一个sample,但是写入每个sample的reftimestamp是基于basedelta value
    • 写入变长整型值,sample的delta ref
    • 写入变长整型值,sample的delta timestamp
    • 写入浮点类型的value,先将浮点类型按照IEEE 754转化为二进制表述,以uint64进行存储。占用8个字节。
// Samples appends the encoded samples to b and returns the resulting slice.
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordSamples))

    if len(samples) == 0 {
        return buf.Get()
    }
    
    // Store base timestamp and base reference number of first sample.
    // All samples encode their timestamp and ref as delta to those.
    first := samples[0]

    buf.PutBE64(first.Ref)
    buf.PutBE64int64(first.T)

    for _, s := range samples {
        buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
        buf.PutVarint64(s.T - first.T)
        buf.PutBE64(math.Float64bits(s.V))
    }
    return buf.Get()
}

samples record格式为:

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| first sample reference number  <8b>             |
+-------------------------------------------------+
| first sample timestamp  <8b>                    |
+-------------------------------------------------+
| +---------------------------------------------+ |
| | delta ref <uvarint>                         | |
| +---------------------------------------------+ |
| | delta timestamp <uvarint>                   | |
| +----------------------+----------------------+ |
| | value <8b>                                  | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

Tombstones方法将stones encode成stone record并返回此slice。

  • 首先写入一字节的type
  • 遍历所有的stones
    • 由于每个stone包含多个时间间隔,遍历每个stone的所有时间间隔
      • 写入stoneref,占用八个字节
      • 写入变长整型值,interval的Mint
      • 写入变长整型值,interval的Maxt
// Interval represents a single time-interval.
type Interval struct {
    Mint, Maxt int64
}

// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
    buf := encoding.Encbuf{B: b}
    buf.PutByte(byte(RecordTombstones))

    for _, s := range tstones {
        for _, iv := range s.intervals {
            buf.PutBE64(s.ref)
            buf.PutVarint64(iv.Mint)
            buf.PutVarint64(iv.Maxt)
        }
    }
    return buf.Get()
}

tombstone record格式为:

+-------------------------------------------------+
| type <1b>                                       |
+-------------------------------------------------+
| +---------------------------------------------+ |
| |    +-----------------------------------+    | |
| |    |    stone ref <8b>                 |    | |
| |    +-----------------------------------+    | |
| |    |    interval Mint <uvarint>        |    | |
| |    +-----------------------------------+    | |
| |    |    interval Maxt <uvarint>        |    | |
| |    +-----------------------------------+    | |
| |                 . . .                       | |
| +----------------------+----------------------+ |
|                   . . .                         |  
+-------------------------------------------------+

RecordEncodeseriessamples encode成series recordsample record之后,会分别调用WAL的Log方法,将其写入WAL中,具体可见文档Prometheus WAL源码阅读部分。


问题点

  • 为啥要定义两种类型的Appender接口,一个在storage package,另一个在tsdb package,而且两个接口定义的方法还略有不同。

References

相关文章

网友评论

      本文标题:Prometheus数据存储

      本文链接:https://www.haomeiwen.com/subject/cslfvctx.html