美文网首页
prometheus/tsdb 的源码阅读笔记 0x01

prometheus/tsdb 的源码阅读笔记 0x01

作者: 逆麟囧 | 来源:发表于2018-01-09 17:44 被阅读418次

分析一下
github.com/prometheus/prometheus/tsdb/chunkenc

github.com/prometheus/prometheus/tsdb/chunks
这两个 pkg

1. chunkenc

chunkenc
├── bstream.go
├── chunk.go
├── chunk_test.go
└── xor.go

chunkenc 提供了时序数据点的编码格式.

它定义了一个 Chunk 接口 及其附属的 AppenderIterator接口.

此外给出了 Chunk 的一个实现 XORChunk .

Chunk, Appender, Iterator

定义了数据块Chunk, 其为一组数据点的集合.

可以通过 Appender 继续写入, 及通过 Iterator 遍历已有的数据点.

这里明确指出了一个数据点是一对 时间戳 (int64) 和值 (float64).

Pool

Chunk 对象池的定义, 并给出了一个基于内存的实现.

XORChunk

这是目前给出的唯一一个 Chunk 实现, 使用了 Gorilla 的算法思路.

func (a *xorAppender) Append(t int64, v float64) {
    var tDelta uint64
    num := binary.BigEndian.Uint16(a.b.bytes())

    if num == 0 {
        // ...

    } else if num == 1 {
        // ...

    } else {
        tDelta = uint64(t - a.t)
        dod := int64(tDelta - a.tDelta)

        // Gorilla has a max resolution of seconds, Prometheus milliseconds.
        // Thus we use higher value range steps with larger bit size.
        switch {
        case dod == 0:
            a.b.writeBit(zero)
        case bitRange(dod, 14):
            a.b.writeBits(0x02, 2) // '10'
            a.b.writeBits(uint64(dod), 14)
        case bitRange(dod, 17):
            a.b.writeBits(0x06, 3) // '110'
            a.b.writeBits(uint64(dod), 17)
        case bitRange(dod, 20):
            a.b.writeBits(0x0e, 4) // '1110'
            a.b.writeBits(uint64(dod), 20)
        default:
            a.b.writeBits(0x0f, 4) // '1111'
            a.b.writeBits(uint64(dod), 64)
        }

        a.writeVDelta(v)
    }

    a.t = t
    a.v = v
    binary.BigEndian.PutUint16(a.b.bytes(), num+1)
    a.tDelta = tDelta
}

上述代码的 switch 部分对 dod (delta of delta) 的大小范围进行判定, 以确定一个最多 4bit 的标识, 及标识后的数据长度.

这里为了支持毫秒级的时间精度 (原始算法中为秒级), 对每一级的范围和长度做了调整.

数据点的压缩比率会受到一些影响, 但能适应更多的使用场景.

bstream

XORChunk 写入和读取点数据都依赖于 bstream提供的 bit 流读写能力, 核心是

func (b *bstream) writeBit(bit bit) {
    // ...
}

func (b *bstream) readBit() (bit, error) {
    // ...
}
func (b *bstream) writeByte(byt byte) {
    // ...
}

func (b *bstream) readByte() (byte, error) {
    // ...
}
func (b *bstream) writeBits(u uint64, nbits int) {
    // ...
}

func (b *bstream) readBits(nbits int) (uint64, error) {
    // ...
}

三组方法.

2. chunks

chunks
└── chunks.go

chunks 是 chunk 数据持久化的实现

Meta

Chunk 的元数据

Writer

是一个基于文件目录的 ChunkWriter 实现.

执行一组 Chunk 的写入, 并按体积进行分片, 每一片称为一个 sequenceFile.

  1. 结构体定义

    // Writer implements the ChunkWriter interface for the standard
    // serialization format.
    type Writer struct {
     // 当前目录
     dirFile *os.File
    
     // 所有用于写入的数据文件, 只有最后一个是当前有效的
     files   []*os.File
     wbuf    *bufio.Writer
     
     // 当前分片文件已写入的字节数
     n       int64
    
     // 复用的 crc32, 用于每一个写入的 Chunk 的校验
     crc32   hash.Hash
    
     // 分片的尺寸, 目前是 512 << 20
     segmentSize int64
    }
    
  2. Writer.finalizeTail & Writer.cut

    // 安全地关闭当前用于写入的文件
    func (w *Writer) finalizeTail() error {
     // ...
     
     // As the file was pre-allocated, we truncate any superfluous zero bytes.
     // 由于每个 seq file 都会预先分配空间, 因此需要按照实际使用量进行一次 Truncate
     off, err := tf.Seek(0, os.SEEK_CUR)
     if err != nil {
         return err
     }
     if err := tf.Truncate(off); err != nil {
         return err
     }
    
     return tf.Close()
    }
    
    func (w *Writer) cut() error {
     // Sync current tail to disk and close.
     if err := w.finalizeTail(); err != nil {
         return err
     }
    
         // 打开一个新文件用于数据写入
     p, _, err := nextSequenceFile(w.dirFile.Name())
     if err != nil {
         return err
     }
     f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
     if err != nil {
         return err
     }
      
         // 为文件预分配 segmentSize 大小
     if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
         return err
     }
     if err = w.dirFile.Sync(); err != nil {
         return err
     }
    
     // Write header metadata for new file.
    
     metab := make([]byte, 8)
     binary.BigEndian.PutUint32(metab[:4], MagicChunks)
     metab[4] = chunksFormatV1
    
     if _, err := f.Write(metab); err != nil {
         return err
     }
    
         // 重置或初始化一个 bufio.Writer
     w.files = append(w.files, f)
     if w.wbuf != nil {
         w.wbuf.Reset(f)
     } else {
         w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
     }
      
         // 重置已写入的字节数, 8 是 文件头 MagicChunks 占用的大小
         // 即前面 `Write header metadata for new file.` 的 b 部分
     w.n = 8
    
     return nil
    }
    
  3. Writer.WriteChunks

    func (w *Writer) WriteChunks(chks ...Meta) error {
     // Calculate maximum space we need and cut a new segment in case
     // we don't fit into the current one.
         // 计算所有 chunks 可能占用的空间
     maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
     for _, c := range chks {
         maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
         maxLen += int64(len(c.Chunk.Bytes()))
     }
     newsz := w.n + maxLen
    
      // 根据这里执行 w.cut() 的判断条件, 实际上是可能出现单个文件超过 w.segmentSize 的
     if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
         if err := w.cut(); err != nil {
             return err
         }
     }
    
     var (
         // 初始化 b 作为写入长度, 写入 chunk 编码方式, 计算 hash 时等的 buffer
         b   = [binary.MaxVarintLen32]byte{}
         seq = uint64(w.seq()) << 32
     )
     for i := range chks {
         chk := &chks[i]
    
         // 用于定位 Chunk 
         chk.Ref = seq | uint64(w.n)
    
         // ...
    
         // 校验数据
         w.crc32.Reset()
         if err := chk.writeHash(w.crc32); err != nil {
             return err
         }
         if err := w.write(w.crc32.Sum(b[:0])); err != nil {
             return err
         }
     }
    
     return nil
    }
    
ByteSlice

用于 Reader 中逐段读取数据

Reader

用于读取数据块

  1. NewDirReader

    // NewDirReader returns a new Reader against sequentially numbered files in the
    // given directory.
    func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
         // 根据命名规则得到所有数据文件
     files, err := sequenceFiles(dir)
     if err != nil {
         return nil, err
     }
      
         // 初始化一个 chunkenc.Pool
     if pool == nil {
         pool = chunkenc.NewPool()
     }
    
     var bs []ByteSlice
     var cs []io.Closer
    
     for _, fn := range files {
         f, err := fileutil.OpenMmapFile(fn)
         if err != nil {
             return nil, errors.Wrapf(err, "mmap files")
         }
         cs = append(cs, f)
         bs = append(bs, realByteSlice(f.Bytes()))
     }
     return newReader(bs, cs, pool)
    }
    
  2. Reader.Chunk

    // 根据定位读取指定 Chunk
    func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
     // 分别计算文件所在的位置, 和 Chunk 数据的起始位置
     var (
         seq = int(ref >> 32)
         off = int((ref << 32) >> 32)
     )
     
     // ...
    
     // 将数据封装成一个 chunkenc.Chunk
     return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l])
    }
    

相关文章

网友评论

      本文标题:prometheus/tsdb 的源码阅读笔记 0x01

      本文链接:https://www.haomeiwen.com/subject/myjgnxtx.html