美文网首页
Prometheus checkpoint源码阅读

Prometheus checkpoint源码阅读

作者: 酱油王0901 | 来源:发表于2019-10-19 16:52 被阅读0次

    由于Prometheus storage很多地方都是借鉴RocksDB的设计思想,下面引用RocksDBcheckpoint的介绍。

    Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory. Checkpoints can be used as a point in time snapshot, which can be opened Read-only to query rows as of the point in time or as a Writeable snapshot by opening it Read-Write. Checkpoints can be used for both full and incremental backups.

    简单来说,Checkpoint是某一个时间点上的snapshot

    概念介绍

    在介绍checkpoint之前我们先来分析一下Prometheus里面的一些术语及其scrape的数据格式。

    # HELP node_cpu_seconds_total Seconds the cpus spent in each mode.
    # TYPE node_cpu_seconds_total counter
    node_cpu_seconds_total{cpu="0",mode="idle"} 52544.27
    node_cpu_seconds_total{cpu="0",mode="nice"} 0
    node_cpu_seconds_total{cpu="0",mode="system"} 6582
    node_cpu_seconds_total{cpu="0",mode="user"} 7838.02
    node_cpu_seconds_total{cpu="1",mode="idle"} 61939.86
    node_cpu_seconds_total{cpu="1",mode="nice"} 0
    node_cpu_seconds_total{cpu="1",mode="system"} 1991.59
    node_cpu_seconds_total{cpu="1",mode="user"} 3031.76
    

    从采样数据可以看出prometheus的数据模型主要包含Metric nameslables,以及samples

    每个time series都由metric name和可选的labels唯一标识。

    <metric name>{<label name>=<label value>, ...}

    Every time series is uniquely identified by its metric name and optional key-value pairs called labels.

    Labels enable Prometheus's dimensional data model: any given combination of labels for the same metric name identifies a particular dimensional instantiation of that metric.

    Samples包括一个浮点值以及毫秒精度的时间戳。

    Samples form the actual time series data. Each sample consists of:

    • a float64 value
    • a millisecond-precision timestamp

    Prometheus metric types主要包含四种,CounterGaugeHistogram以及Summary


    源码分析

    (ENV) 🍺 /Users/xsky/go/src/github.com/microyahoo/prometheus ☞ tree data -h
    data
    ├── [ 192]  01DPE8T5XPQ9ZYHSNJYBJBKGR6
    │   ├── [  96]  chunks
    │   │   └── [7.1K]  000001
    │   ├── [ 22K]  index
    │   ├── [ 272]  meta.json
    │   └── [   9]  tombstones
    ├── [   0]  lock
    ├── [ 20K]  queries.active
    └── [ 256]  wal
        ├── [   0]  00000050
        ├── [   0]  00000051
        ├── [   0]  00000052
        ├── [   0]  00000053
        ├── [ 10K]  00000054
        └── [  96]  checkpoint.000049
            └── [ 32K]  00000000
    
    4 directories, 12 files
    

    由于下面的源码分析会用到上述的目录结构,从上述可以看到wal目录下有一个checkpoint.N的目录,目录下包含相应的checkpoint文件。

    // Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
    // It includes the most recent checkpoint if it exists.
    // All series not satisfying keep and samples below mint are dropped.
    //
    // The checkpoint is stored in a directory named checkpoint.N in the same
    // segmented format as the original WAL itself.
    // This makes it easy to read it through the WAL package and concatenate
    // it with the original WAL.
    func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
        stats := &CheckpointStats{}
        var sgmReader io.ReadCloser
    
        {
    
            var sgmRange []wal.SegmentRange
            // 查找wal目录下查找最近的checkpoint,
            // 由于有可能有多个checkpoint,因此找的是最近的。
            // 返回checkpoint的目录,以及checkpoint目录的后缀索引,
            // 例如上面的目录结构中idx=49
            dir, idx, err := LastCheckpoint(w.Dir())
            if err != nil && err != ErrNotFound {
                return nil, errors.Wrap(err, "find last checkpoint")
            }
            last := idx + 1
            // 这个地方需要判断一下是因为有可能没有打过checkpoint
            if err == nil {
                if from > last {
                    return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
                }
                // Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
                // from从最后一次checkpoint的index+1开始
                // 也就是说前面的已经打过checkpoint了
                from = last
    
                sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32})
            }
            
            // 起始段范围
            sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to})
            // 将其包装成SegmentRangeReader
            sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...)
            if err != nil {
                return nil, errors.Wrap(err, "create segment reader")
            }
            defer sgmReader.Close()
        }
    
        cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
        cpdirtmp := cpdir + ".tmp"
    
        // 创建checkpoint.XXXXXX.tmp的临时目录
        if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
            return nil, errors.Wrap(err, "create checkpoint dir")
        }
        // 在checkpoint.XXXXXX.tmp的临时目录中创建WAL段文件,
        // 如果目录为空,则初始WAL index为0,即00000000
        cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
        if err != nil {
            return nil, errors.Wrap(err, "open checkpoint")
        }
    
        // Ensures that an early return caused by an error doesn't leave any tmp files.
        defer func() {
            cp.Close()
            os.RemoveAll(cpdirtmp)
        }()
    
        r := wal.NewReader(sgmReader)
        
        var (
            series  []RefSeries
            samples []RefSample
            tstones []Stone
            dec     RecordDecoder
            enc     RecordEncoder
            buf     []byte
            recs    [][]byte
        )
        // 依次读取每个record
        for r.Next() {
            series, samples, tstones = series[:0], samples[:0], tstones[:0]
    
            // We don't reset the buffer since we batch up multiple records
            // before writing them to the checkpoint.
            // Remember where the record for this iteration starts.
            start := len(buf)
            rec := r.Record()
    
            // 判断record的类型
            switch dec.Type(rec) {
            case RecordSeries:
                // 先decode所有的series记录,下面有详细解释
                series, err = dec.Series(rec, series)
                if err != nil {
                    return nil, errors.Wrap(err, "decode series")
                }
                // Drop irrelevant series in place.
                repl := series[:0]
                // 根据keep判断哪些需要保留
                for _, s := range series {
                    if keep(s.Ref) {
                        repl = append(repl, s)
                    }
                }
                // 将series encode到buf中
                if len(repl) > 0 {
                    buf = enc.Series(repl, buf)
                }
                // 统计总共的,以及丢弃的series
                stats.TotalSeries += len(series)
                stats.DroppedSeries += len(series) - len(repl)
    
            case RecordSamples:
                // 从rec中decode所有的samples,下面有详细解释
                samples, err = dec.Samples(rec, samples)
                if err != nil {
                    return nil, errors.Wrap(err, "decode samples")
                }
                // Drop irrelevant samples in place.
                repl := samples[:0]
                for _, s := range samples {
                    // 将Samples中T小于mint的过滤掉
                    if s.T >= mint {
                        repl = append(repl, s)
                    }
                }
                if len(repl) > 0 {
                    // 将samples encode到buf中
                    buf = enc.Samples(repl, buf)
                }
                stats.TotalSamples += len(samples)
                stats.DroppedSamples += len(samples) - len(repl)
    
            case RecordTombstones:
                tstones, err = dec.Tombstones(rec, tstones)
                if err != nil {
                    return nil, errors.Wrap(err, "decode deletes")
                }
                // Drop irrelevant tombstones in place.
                repl := tstones[:0]
                for _, s := range tstones {
                    for _, iv := range s.intervals {
                        // TODO why?
                        if iv.Maxt >= mint {
                            repl = append(repl, s)
                            break
                        }
                    }
                }
                if len(repl) > 0 {
                    buf = enc.Tombstones(repl, buf)
                }
                stats.TotalTombstones += len(tstones)
                stats.DroppedTombstones += len(tstones) - len(repl)
    
            default:
                return nil, errors.New("invalid record type")
            }
            if len(buf[start:]) == 0 {
                continue // All contents discarded.
            }
            recs = append(recs, buf[start:])
    
            // Flush records in 1 MB increments.
            // 每当buf累积到1MB时flush一次
            if len(buf) > 1*1024*1024 {
                if err := cp.Log(recs...); err != nil {
                    return nil, errors.Wrap(err, "flush records")
                }
                buf, recs = buf[:0], recs[:0]
            }
        }
        // If we hit any corruption during checkpointing, repairing is not an option.
        // The head won't know which series records are lost.
        if r.Err() != nil {
            return nil, errors.Wrap(r.Err(), "read segments")
        }
    
        // Flush remaining records.
        // flush剩余的records
        if err := cp.Log(recs...); err != nil {
            return nil, errors.Wrap(err, "flush records")
        }
        if err := cp.Close(); err != nil {
            return nil, errors.Wrap(err, "close checkpoint")
        }
        // 将checkpoint.XXXXXX.tmp重命名为checkpoint.XXXXXX
        if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
            return nil, errors.Wrap(err, "rename checkpoint directory")
        }
    
        return stats, nil
    }
    

    从代码中可以看出从SegmentRangeReader中依次读取所有的records, 每读取一条record会先判断其类型,record主要有三种类型Series, Samples, Tombstones, 如下所示:

    // RecordSeries is used to match WAL records of type Series.
    RecordSeries RecordType = 1
    // RecordSamples is used to match WAL records of type Samples.
    RecordSamples RecordType = 2
    // RecordTombstones is used to match WAL records of type Tombstones.
    RecordTombstones RecordType = 3
    

    按照代码描述record中第一个字节为type。

    • 如果类型为Series,则需要从record中decode所有的series记录。
    // Series appends series in rec to the given slice.
    // 从rec中decode所有的series
    func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
        dec := encoding.Decbuf{B: rec}
    
        if RecordType(dec.Byte()) != RecordSeries {
            return nil, errors.New("invalid record type")
        }
        for len(dec.B) > 0 && dec.Err() == nil {
            ref := dec.Be64()
    
            lset := make(labels.Labels, dec.Uvarint())
    
            for i := range lset {
                lset[i].Name = dec.UvarintStr()
                lset[i].Value = dec.UvarintStr()
            }
            sort.Sort(lset)
    
            series = append(series, RefSeries{
                Ref:    ref,
                Labels: lset,
            })
        }
        if dec.Err() != nil {
            return nil, dec.Err()
        }
        if len(dec.B) > 0 {
            return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
        }
        return series, nil
    }
    
    • 分析如下:
      • 第一个字节为type
      • 接下来的8个字节为ref
      • 接着读取可变长度的字节slice,表示Lables的个数。
      • 接着是Name和value组成的label键值对。
        • 其中解析Name和Value时都是先读取Name和Value的长度,接着读取指定长度的[]byte,将其转化为字符串。
        +-----------------+-----------------+
        |       Name      |      Value      |
        +-----------+-----+-----------+-----+
        | Uvarint64 | len | Uvarint64 | len |
        +-----------------+-----------+-----+
        
      • 接着将labels按照name进行排序。
      • 生成一条完整的Series记录。
    +-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
    | type <1b> | ref <8b> | len(lables) | name | value  | name | value | ... | ... | ref <8b> | len(lables) | name | value  | name | value | ... | ... |
    +-----------+----------+-------------+------+--------+------+-------+-----+-----+----------+-------------+------+--------+------+-------+-----+-----+
        byte       uint64     Uvarint
    
    • 如果类型为Sampels
    // Samples appends samples in rec to the given slice.
    func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
        dec := encoding.Decbuf{B: rec}
    
        if RecordType(dec.Byte()) != RecordSamples {
            return nil, errors.New("invalid record type")
        }
        if dec.Len() == 0 {
            return samples, nil
        }
        var (
            baseRef  = dec.Be64()
            baseTime = dec.Be64int64()
        )
        for len(dec.B) > 0 && dec.Err() == nil {
            dref := dec.Varint64()
            dtime := dec.Varint64()
            val := dec.Be64()
    
            samples = append(samples, RefSample{
                Ref: uint64(int64(baseRef) + dref),
                T:   baseTime + dtime,
                V:   math.Float64frombits(val),
            })
        }
    
        if dec.Err() != nil {
            return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
        }
        if len(dec.B) > 0 {
            return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
        }
        return samples, nil
    }
    
    

    具体格式如下:

    +-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
    | type <1b> | baseRef <8b> | baseTime <8b> | dref | dtime  |  val | dref  |  dtime | val | ... | ... | ... | 
    +-----------+--------------+---------------+------+--------+------+-------+--------+-----+-----+-----+-----+
        byte        uint64          int64      Varint64 Varint64 uint64
    

    其中RefSample:

    RefSample{
        Ref: uint64(int64(baseRef) + dref),
        T:   baseTime + dtime,
        V:   math.Float64frombits(val),
    }
    
    • 如果类型为Tombstones
    // Tombstones appends tombstones in rec to the given slice.
    func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
        dec := encoding.Decbuf{B: rec}
    
        if RecordType(dec.Byte()) != RecordTombstones {
            return nil, errors.New("invalid record type")
        }
        for dec.Len() > 0 && dec.Err() == nil {
            // TODO 这个地方还得确认一下
            // 这里每个stone只有一个Interval?
            tstones = append(tstones, Stone{
                ref: dec.Be64(),
                intervals: Intervals{
                    {Mint: dec.Varint64(), Maxt: dec.Varint64()},
                },
            })
        }
        if dec.Err() != nil {
            return nil, dec.Err()
        }
        if len(dec.B) > 0 {
            return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
        }
        return tstones, nil
    }
    

    具体格式如下:

    +-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
    | type <1b> | ref <8b> | minT |  maxT  |  ref <8b> | minT |  maxT   | ... | ... | ... |
    +-----------+----------+------+--------+-----------+------+---------+-----+-----+-----+
        byte      uint64   Varint64 Varint64 
    

    其中Stone:

    Stone{
        ref: dec.Be64(),
        intervals: Intervals{
            {
                Mint: dec.Varint64(), 
                Maxt: dec.Varint64()
            },
        }
    }
    

    问题点

    • SegmentRangeReader中读取一条record后会先判断其type,第一个字节表示其类型吗?目前还没看到。
    • RecordDecoder.Tombstones中每个Stone只对应一个Interval吗?

    References

    相关文章

      网友评论

          本文标题:Prometheus checkpoint源码阅读

          本文链接:https://www.haomeiwen.com/subject/lulnmctx.html