美文网首页
prometheus/tsdb 的源码阅读笔记 0x03

prometheus/tsdb 的源码阅读笔记 0x03

作者: 逆麟囧 | 来源:发表于2018-01-17 11:56 被阅读372次

    之前的文章分段介绍了 prometheus/tsdb 下的各个 pkg 的具体内容
    这篇文章将完整分析 prometheus/tsdb 本身的实现

    tombstones.go

    Stone

    Stone 是作为删除数据的标记

    // Stone holds the information on the posting and time-range
    // that is deleted.
    type Stone struct {
        ref       uint64
        intervals Intervals
    }
    
    Interval, Intervals

    用来记录时间段

    // Interval represents a single time-interval.
    type Interval struct {
        Mint, Maxt int64
    }
    
    func (tr Interval) inBounds(t int64) bool {
        return t >= tr.Mint && t <= tr.Maxt
    }
    
    func (tr Interval) isSubrange(dranges Intervals) bool {
        for _, r := range dranges {
            if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) {
                return true
            }
        }
    
        return false
    }
    
    TombstoneReader
    // TombstoneReader gives access to tombstone intervals by series reference.
    type TombstoneReader interface {
        // Get returns deletion intervals for the series with the given reference.
        Get(ref uint64) (Intervals, error)
    
        // Iter calls the given function for each encountered interval.
        Iter(func(uint64, Intervals) error) error
    
        // Close any underlying resources
        Close() error
    }
    

    提供了一个内存版的实现

    type memTombstones map[uint64]Intervals
    
    var emptyTombstoneReader = memTombstones{}
    
    // EmptyTombstoneReader returns a TombstoneReader that is always empty.
    func EmptyTombstoneReader() TombstoneReader {
        return emptyTombstoneReader
    }
    
    func (t memTombstones) Get(ref uint64) (Intervals, error) {
        return t[ref], nil
    }
    
    func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
        for ref, ivs := range t {
            if err := f(ref, ivs); err != nil {
                return err
            }
        }
        return nil
    }
    
    func (t memTombstones) add(ref uint64, itv Interval) {
        t[ref] = t[ref].add(itv)
    }
    
    func (memTombstones) Close() error {
        return nil
    }
    

    TombstoneReader 的内容可以被写入文件, 也可以通过文件读出.

    func writeTombstoneFile(dir string, tr TombstoneReader) error {
        path := filepath.Join(dir, tombstoneFilename)
        tmp := path + ".tmp"
        
        // ...
    
        return renameFile(tmp, path)
    }
    
    func readTombstones(dir string) (memTombstones, error) {
        b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
        // ...
    
        stonesMap := memTombstones{}
    
        for d.len() > 0 {
            // ...
            stonesMap.add(k, Interval{mint, maxt})
        }
    
        return stonesMap, nil
    }
    

    wal.go

    prometheus/tsdb 会将几类数据先写入 wal (write ahead log) 文件

    // WALEntryType indicates what data a WAL entry contains.
    type WALEntryType uint8
    
    // Entry types in a segment file.
    const (
        WALEntrySymbols WALEntryType = 1
        WALEntrySeries  WALEntryType = 2
        WALEntrySamples WALEntryType = 3
        WALEntryDeletes WALEntryType = 4
    )
    
    // WAL is a write ahead log that can log new series labels and samples.
    // It must be completely read before new entries are logged.
    type WAL interface {
        Reader() WALReader
        LogSeries([]RefSeries) error
        LogSamples([]RefSample) error
        LogDeletes([]Stone) error
        Truncate(mint int64, keep func(uint64) bool) error
        Close() error
    }
    
    // WALReader reads entries from a WAL.
    type WALReader interface {
        Read(
            seriesf func([]RefSeries),
            samplesf func([]RefSample),
            deletesf func([]Stone),
        ) error
    }
    

    与之相关的数据结构定义如下

    // RefSeries is the series labels with the series ID.
    type RefSeries struct {
        Ref    uint64
        Labels labels.Labels
    }
    
    // RefSample is a timestamp/value pair associated with a reference to a series.
    type RefSample struct {
        Ref uint64
        T   int64
        V   float64
    
        // 基于内存的 series 数据, 在后续的阅读中再仔细分析
        series *memSeries
    }
    
    
    SegmentWAL

    这是 WAL 的一个实现, 会将数据切成 256MB 一片进行存储, 切片的组织方式与 chunks 类似.

    相应的, 操作文件的相关实现代码也很相似.

    // segmentFile wraps a file object of a segment and tracks the highest timestamp
    // it contains. During WAL truncating, all segments with no higher timestamp than
    // the truncation threshold can be compacted.
    type segmentFile struct {
        *os.File
        maxTime   int64  // highest tombstone or sample timpstamp in segment
        minSeries uint64 // lowerst series ID in segment
    }
    
    // SegmentWAL is a write ahead log for series data.
    type SegmentWAL struct {
        mtx     sync.Mutex
        metrics *walMetrics
    
        dirFile *os.File
        files   []*segmentFile
    
        logger        log.Logger
        flushInterval time.Duration
        segmentSize   int64
    
        crc32 hash.Hash32
        cur   *bufio.Writer
        curN  int64
    
        // 信号
        stopc   chan struct{}
        donec   chan struct{}
        
        // 后台执行的操作
        actorc  chan func() error // sequentialized background operations
        
        buffers sync.Pool
    }
    
    LogXXXX

    LogSeries, LogSamples, LogDeletes 对各自的操作数据分别编码写入 WAL.

    Truncate
    // Truncate deletes the values prior to mint and the series which the keep function
    // does not indiciate to preserve.
    // 用于清除不再需要的数据
    func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
        // ...
    
        return nil
    }
    
    run

    通过 OpenSegmentWAL 打开一个 SegmentWAL 的时候, 会在一个独立的 goroutine 中运行 run 函数, 用来处理 actorc 传递的后台操作.

    目前 actorc 传递的操作仅有文件的分片

    // cut finishes the currently active segments and opens the next one.
    // The encoder is reset to point to the new segment.
    func (w *SegmentWAL) cut() error {
        // Sync current head to disk and close.
        if hf := w.head(); hf != nil {
            if err := w.flush(); err != nil {
                return err
            }
            
            // Finish last segment asynchronously to not block the WAL moving along
            // in the new segment.
            // 结束当前的切片文件
            go func() {
                w.actorc <- func() error {
                    off, err := hf.Seek(0, os.SEEK_CUR)
                    if err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Truncate(off); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Sync(); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Close(); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    return nil
                }
            }()
        }
    
        // 初始化新的切片文件供写入
        // ...
        
        return nil
    }
    

    Compact.go

    对底层存储的压缩相关的实现

    // Compactor provides compaction against an underlying storage
    // of time series data.
    type Compactor interface {
        // Plan returns a set of non-overlapping directories that can
        // be compacted concurrently.
        // Results returned when compactions are in progress are undefined.
        Plan(dir string) ([]string, error)
    
        // Write persists a Block into a directory.
        Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
    
        // Compact runs compaction against the provided directories. Must
        // only be called concurrently with results of Plan().
        Compact(dest string, dirs ...string) (ulid.ULID, error)
    }
    
    LeveledCompactor

    是 Compactor 的实现

    Plan
    // Plan returns a list of compactable blocks in the provided directory.
    func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
        dirs, err := blockDirs(dir)
        
        // ...
      
        var dms []dirMeta
    
        for _, dir := range dirs {
            // 读取 BlockMeta 作为判断是否可以 compact 的依据
            meta, err := readMetaFile(dir)
            
            // ...
        }
        return c.plan(dms)
    }
    
    populateBlock

    LeveledCompactor.WriteLeveledCompactor.Compact 两个方法中都用到 LeveledCompactor.write, 而 LeveledCompactor.populateBlock 是 write 方法的重要逻辑.

    其作用是将一组 Block 的数据合并, 再写入 IndexWriter, ChunkWriter.

    // populateBlock fills the index and chunk writers with new data gathered as the union
    // of the provided blocks. It returns meta information for the new block.
    func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
        var (
            set        ChunkSeriesSet
            allSymbols = make(map[string]struct{}, 1<<16)
            closers    = []io.Closer{}
        )
        defer func() { closeAll(closers...) }()
    
        // 遍历旧 block 数据
        for i, b := range blocks {
            indexr, err := b.Index()
            // ...
    
            chunkr, err := b.Chunks()
            // ...
    
            tombsr, err := b.Tombstones()
            // ...
    
            symbols, err := indexr.Symbols()
            // ...
    
            all, err := indexr.Postings(index.AllPostingsKey())
            if err != nil {
                return err
            }
            all = indexr.SortedPostings(all)
    
            s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
    
            // ...
          
            // 与上一层并形成一个新的 merger
            set, err = newCompactionMerger(set, s)
            if err != nil {
                return err
            }
        }
    
        // We fully rebuild the postings list index from merged series.
        // ...
    
        // 遍历 merger
        for set.Next() {
            lset, chks, dranges := set.At() // The chunks here are not fully deleted.
    
            // Skip the series with all deleted chunks.
            // ...
    
            if err := chunkw.WriteChunks(chks...); err != nil {
                return errors.Wrap(err, "write chunks")
            }
    
            if err := indexw.AddSeries(i, lset, chks...); err != nil {
                return errors.Wrap(err, "add series")
            }
    
            // ...
        }
        
        // ...
    
        s := make([]string, 0, 256)
        for n, v := range values {
            // ...
    
            if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
                return errors.Wrap(err, "write label index")
            }
        }
    
        for _, l := range postings.SortedKeys() {
            if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
                return errors.Wrap(err, "write postings")
            }
        }
        return nil
    }
    

    block.go

    Block
    Delete
    // Delete matching series between mint and maxt in the block.
    // 前面说到, Delete 的时候会暂时先标记为 Tombstone, 这里即实现部分
    func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
        // ...
    
        err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
            for _, iv := range ivs {
                stones.add(id, iv)
                pb.meta.Stats.NumTombstones++
            }
            return nil
        })
        if err != nil {
            return err
        }
        pb.tombstones = stones
    
        if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
            return err
        }
        return writeMetaFile(pb.dir, &pb.meta)
    }
    
    CleanTombstones
    // CleanTombstones will rewrite the block if there any tombstones to remove them
    // and returns if there was a re-write.
    func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
        numStones := 0
    
        pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
            for _ = range ivs {
                numStones++
            }
    
            return nil
        })
    
        if numStones == 0 {
            return false, nil
        }
    
        if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
            return false, err
        }
    
        return true, nil
    }
    
    Snapshot

    疑问, 这里仅对目标文件夹及其内部文件做了 hardlink, 怎么确保内容不变?

    head.go

    Head

    Head 向调用方提供, 用于某个时间段内的数据读写.

    Head 会同时处理 WAL 内的和已经持久化的数据.

    Head 可以认为是current Block

    所有 Block 不可再写入, Head 在写入有效期过后会转化为 Block 进行持久化.

    Appender
    // Appender returns a new Appender on the database.
    // 会根据具体情形决定返回的 Appender 实例
    // Appender 实例共两类
    // initAppender 会在接收到第一个数据点时初始化 Head 的起始时间
    // headAppender 逻辑相对简单
    func (h *Head) Appender() Appender {
        h.metrics.activeAppenders.Inc()
    
        // The head cache might not have a starting point yet. The init appender
        // picks up the first appended timestamp as the base.
        if h.MinTime() == math.MinInt64 {
            return &initAppender{head: h}
        }
        return h.appender()
    }
    
    func (h *Head) appender() *headAppender {
        return &headAppender{
            head:          h,
            mint:          h.MaxTime() - h.chunkRange/2,
            samples:       h.getAppendBuffer(),
            highTimestamp: math.MinInt64,
        }
    }
    

    querier.go

    围绕以下三个接口, 向调用方提供查询能力.

    // Querier provides querying access over time series data of a fixed
    // time range.
    type Querier interface {
        // Select returns a set of series that matches the given label matchers.
        Select(...labels.Matcher) (SeriesSet, error)
    
        // LabelValues returns all potential values for a label name.
        LabelValues(string) ([]string, error)
        // LabelValuesFor returns all potential values for a label name.
        // under the constraint of another label.
        LabelValuesFor(string, labels.Label) ([]string, error)
    
        // Close releases the resources of the Querier.
        Close() error
    }
    
    // Series exposes a single time series.
    type Series interface {
        // Labels returns the complete set of labels identifying the series.
        Labels() labels.Labels
    
        // Iterator returns a new iterator of the data of the series.
        Iterator() SeriesIterator
    }
    
    // SeriesSet contains a set of series.
    type SeriesSet interface {
        Next() bool
        At() Series
        Err() error
    }
    
    querier, blockQuerier

    blockQuerier 是针对一个 block 的 Querier

    querier 是 blockQuerier 的聚合

    db.go

    Appender

    Appender 是写入接口, *Head 就实现了 Appender

    // Appender allows appending a batch of data. It must be completed with a
    // call to Commit or Rollback and must not be reused afterwards.
    //
    // Operations on the Appender interface are not goroutine-safe.
    type Appender interface {
        // Add adds a sample pair for the given series. A reference number is
        // returned which can be used to add further samples in the same or later
        // transactions.
        // Returned reference numbers are ephemeral and may be rejected in calls
        // to AddFast() at any point. Adding the sample via Add() returns a new
        // reference number.
        // If the reference is the empty string it must not be used for caching.
        Add(l labels.Labels, t int64, v float64) (uint64, error)
    
        // Add adds a sample pair for the referenced series. It is generally faster
        // than adding a sample by providing its full label set.
        AddFast(ref uint64, t int64, v float64) error
    
        // Commit submits the collected samples and purges the batch.
        Commit() error
    
        // Rollback rolls back all modifications made in the appender so far.
        Rollback() error
    }
    
    DB

    DB 是向调用者提供的最主要的结构体.

    // DB handles reads and writes of time series falling into
    // a hashed partition of a seriedb.
    type DB struct {
        dir   string
        lockf *lockfile.Lockfile
    
        logger    log.Logger
        metrics   *dbMetrics
        opts      *Options
        chunkPool chunkenc.Pool
        compactor Compactor
    
        // Mutex for that must be held when modifying the general block layout.
        mtx    sync.RWMutex
        blocks []*Block
    
        head *Head
    
        compactc chan struct{}
        donec    chan struct{}
        stopc    chan struct{}
    
        // cmtx is used to control compactions and deletions.
        cmtx               sync.Mutex
        compactionsEnabled bool
    }
    
    reload
    // reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
    // a list of block directories which should be deleted during reload.
    func (db *DB) reload(deleteable ...string) (err error) {
        // ...
        
        // 读取当前所有的 block 目录
        dirs, err := blockDirs(db.dir)
        
        // ...
        
        var (
            blocks []*Block
            exist  = map[ulid.ULID]struct{}{}
        )
    
        for _, dir := range dirs {
            meta, err := readMetaFile(dir)
            
            // ...
    
            // 尝试获取目录对应的 Block, 先从内存, 再从硬盘
            b, ok := db.getBlock(meta.ULID)
            if !ok {
                b, err = OpenBlock(dir, db.chunkPool)
                
                // ...
            }
    
            blocks = append(blocks, b)
            exist[meta.ULID] = struct{}{}
        }
    
        // 按照 Block 覆盖的时间重新排序
        if err := validateBlockSequence(blocks); err != nil {
            return errors.Wrap(err, "invalid block sequence")
        }
    
        // ...
        
        // 清除不必要的 Block 文件
        for _, b := range oldBlocks {
            if _, ok := exist[b.Meta().ULID]; ok {
                continue
            }
            if err := b.Close(); err != nil {
                level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
            }
            if err := os.RemoveAll(b.Dir()); err != nil {
                level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
            }
        }
    
        // Garbage collect data in the head if the most recent persisted block
        // covers data of its current time range.
        if len(blocks) == 0 {
            return nil
        }
        maxt := blocks[len(blocks)-1].Meta().MaxTime
    
        return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
    }
    
    run

    run 方法在 Open 时被调用, 在一个单独的 goroutine 中执行, 主要是定期对数据进行压缩以节省空间

    func (db *DB) run() {
        defer close(db.donec)
    
        backoff := time.Duration(0)
    
        for {
            select {
            case <-db.stopc:
                return
            case <-time.After(backoff):
            }
    
            select {
            case <-time.After(1 * time.Minute):
                select {
                case db.compactc <- struct{}{}:
                default:
                }
            case <-db.compactc:
                // 执行压缩相关代码
    
            case <-db.stopc:
                return
            }
        }
    }
    
    Appender

    返回的是封装的结果 dbAppender, 后面专门再分析

    Qurier

    返回的是所有指定时间范围内的 Block 聚合

    // Querier returns a new querier over the data partition for the given time range.
    // A goroutine must not handle more than one open Querier.
    func (db *DB) Querier(mint, maxt int64) (Querier, error) {
        var blocks []BlockReader
    
        db.mtx.RLock()
        defer db.mtx.RUnlock()
    
        for _, b := range db.blocks {
            m := b.Meta()
            
            // 找出符合时间段的 block
            if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
                blocks = append(blocks, b)
            }
        }
        
        // 前面提到, Head 可以视作当前 Block
        if maxt >= db.head.MinTime() {
            blocks = append(blocks, db.head)
        }
    
        // Block 的聚合
        sq := &querier{
            blocks: make([]Querier, 0, len(blocks)),
        }
        for _, b := range blocks {
            q, err := NewBlockQuerier(b, mint, maxt)
            if err == nil {
                sq.blocks = append(sq.blocks, q)
                continue
            }
            // If we fail, all previously opened queriers must be closed.
            for _, q := range sq.blocks {
                q.Close()
            }
            return nil, errors.Wrapf(err, "open querier for block %s", b)
        }
        return sq, nil
    }
    
    Delete

    这边实际会将 Delete 操作分给各个受影响的 Block

    CleanTombstone

    前面提到, 各个 Block Delete 内的逻辑实际是写 WAL 以及 Tombstone 文件

    这里会对当前所有 Block 真正进行清理, 然后调用 reload 方法.

    dbAppender

    是对 *headAppender 的封装, 在 Commit 的时候触发 compact

    // Appender opens a new appender against the database.
    func (db *DB) Appender() Appender {
        return dbAppender{db: db, Appender: db.head.Appender()}
    }
    
    // dbAppender wraps the DB's head appender and triggers compactions on commit
    // if necessary.
    type dbAppender struct {
        Appender
        db *DB
    }
    
    func (a dbAppender) Commit() error {
        err := a.Appender.Commit()
    
        // We could just run this check every few minutes practically. But for benchmarks
        // and high frequency use cases this is the safer way.
        if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 {
            select {
            case a.db.compactc <- struct{}{}:
            default:
            }
        }
        return err
    }
    

    Summary

    prometheus/tsdb (下称 ptsdb ) 的结构体之间的层次大概可以这样划分:

    • DB: 对外提供的核心对象

      • Block 已经持久化的, 覆盖某个时间段的时序数据. Block 的
        • Index: 用于保存 labels 的索引数据
        • Chunk: 用于保存时间戳-采样值 数据
    • Head: 由于 ptsdb 规定, 数据必须增序写入, 已经持久化的 Block 不能再写入, 因此一个时刻只会有一个可供写入的 Block, 即 Head. Head 同时还承担记录删除动作的任务
      • WAL 增删改的动作都会先进入 WAL, 供后续恢复用
      • Tombstone: 用于标记删除动作, 被标记的数据在 compact 的时候统一清理
    • Compactor: 对文件进行压缩. Block 数据的组织参考了 LSM, 因此 Compactor 的实现也和基于 LSM 的 kv db 类似.

    关于 ptsdb, 时间序列数据的存储和计算 - 开源时序数据库解析(四) 这篇文章有更宏观的阐述, 可以参考.

    相关文章

      网友评论

          本文标题:prometheus/tsdb 的源码阅读笔记 0x03

          本文链接:https://www.haomeiwen.com/subject/qjnyoxtx.html