美文网首页
prometheus/tsdb 的源码阅读笔记 0x01

prometheus/tsdb 的源码阅读笔记 0x01

作者: 逆麟囧 | 来源:发表于2018-01-09 17:44 被阅读418次

    分析一下
    github.com/prometheus/prometheus/tsdb/chunkenc

    github.com/prometheus/prometheus/tsdb/chunks
    这两个 pkg

    1. chunkenc

    chunkenc
    ├── bstream.go
    ├── chunk.go
    ├── chunk_test.go
    └── xor.go
    

    chunkenc 提供了时序数据点的编码格式.

    它定义了一个 Chunk 接口 及其附属的 AppenderIterator接口.

    此外给出了 Chunk 的一个实现 XORChunk .

    Chunk, Appender, Iterator

    定义了数据块Chunk, 其为一组数据点的集合.

    可以通过 Appender 继续写入, 及通过 Iterator 遍历已有的数据点.

    这里明确指出了一个数据点是一对 时间戳 (int64) 和值 (float64).

    Pool

    Chunk 对象池的定义, 并给出了一个基于内存的实现.

    XORChunk

    这是目前给出的唯一一个 Chunk 实现, 使用了 Gorilla 的算法思路.

    func (a *xorAppender) Append(t int64, v float64) {
        var tDelta uint64
        num := binary.BigEndian.Uint16(a.b.bytes())
    
        if num == 0 {
            // ...
    
        } else if num == 1 {
            // ...
    
        } else {
            tDelta = uint64(t - a.t)
            dod := int64(tDelta - a.tDelta)
    
            // Gorilla has a max resolution of seconds, Prometheus milliseconds.
            // Thus we use higher value range steps with larger bit size.
            switch {
            case dod == 0:
                a.b.writeBit(zero)
            case bitRange(dod, 14):
                a.b.writeBits(0x02, 2) // '10'
                a.b.writeBits(uint64(dod), 14)
            case bitRange(dod, 17):
                a.b.writeBits(0x06, 3) // '110'
                a.b.writeBits(uint64(dod), 17)
            case bitRange(dod, 20):
                a.b.writeBits(0x0e, 4) // '1110'
                a.b.writeBits(uint64(dod), 20)
            default:
                a.b.writeBits(0x0f, 4) // '1111'
                a.b.writeBits(uint64(dod), 64)
            }
    
            a.writeVDelta(v)
        }
    
        a.t = t
        a.v = v
        binary.BigEndian.PutUint16(a.b.bytes(), num+1)
        a.tDelta = tDelta
    }
    

    上述代码的 switch 部分对 dod (delta of delta) 的大小范围进行判定, 以确定一个最多 4bit 的标识, 及标识后的数据长度.

    这里为了支持毫秒级的时间精度 (原始算法中为秒级), 对每一级的范围和长度做了调整.

    数据点的压缩比率会受到一些影响, 但能适应更多的使用场景.

    bstream

    XORChunk 写入和读取点数据都依赖于 bstream提供的 bit 流读写能力, 核心是

    func (b *bstream) writeBit(bit bit) {
        // ...
    }
    
    func (b *bstream) readBit() (bit, error) {
        // ...
    }
    
    func (b *bstream) writeByte(byt byte) {
        // ...
    }
    
    func (b *bstream) readByte() (byte, error) {
        // ...
    }
    
    func (b *bstream) writeBits(u uint64, nbits int) {
        // ...
    }
    
    func (b *bstream) readBits(nbits int) (uint64, error) {
        // ...
    }
    

    三组方法.

    2. chunks

    chunks
    └── chunks.go
    

    chunks 是 chunk 数据持久化的实现

    Meta

    Chunk 的元数据

    Writer

    是一个基于文件目录的 ChunkWriter 实现.

    执行一组 Chunk 的写入, 并按体积进行分片, 每一片称为一个 sequenceFile.

    1. 结构体定义

      // Writer implements the ChunkWriter interface for the standard
      // serialization format.
      type Writer struct {
       // 当前目录
       dirFile *os.File
      
       // 所有用于写入的数据文件, 只有最后一个是当前有效的
       files   []*os.File
       wbuf    *bufio.Writer
       
       // 当前分片文件已写入的字节数
       n       int64
      
       // 复用的 crc32, 用于每一个写入的 Chunk 的校验
       crc32   hash.Hash
      
       // 分片的尺寸, 目前是 512 << 20
       segmentSize int64
      }
      
    2. Writer.finalizeTail & Writer.cut

      // 安全地关闭当前用于写入的文件
      func (w *Writer) finalizeTail() error {
       // ...
       
       // As the file was pre-allocated, we truncate any superfluous zero bytes.
       // 由于每个 seq file 都会预先分配空间, 因此需要按照实际使用量进行一次 Truncate
       off, err := tf.Seek(0, os.SEEK_CUR)
       if err != nil {
           return err
       }
       if err := tf.Truncate(off); err != nil {
           return err
       }
      
       return tf.Close()
      }
      
      func (w *Writer) cut() error {
       // Sync current tail to disk and close.
       if err := w.finalizeTail(); err != nil {
           return err
       }
      
           // 打开一个新文件用于数据写入
       p, _, err := nextSequenceFile(w.dirFile.Name())
       if err != nil {
           return err
       }
       f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
       if err != nil {
           return err
       }
        
           // 为文件预分配 segmentSize 大小
       if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
           return err
       }
       if err = w.dirFile.Sync(); err != nil {
           return err
       }
      
       // Write header metadata for new file.
      
       metab := make([]byte, 8)
       binary.BigEndian.PutUint32(metab[:4], MagicChunks)
       metab[4] = chunksFormatV1
      
       if _, err := f.Write(metab); err != nil {
           return err
       }
      
           // 重置或初始化一个 bufio.Writer
       w.files = append(w.files, f)
       if w.wbuf != nil {
           w.wbuf.Reset(f)
       } else {
           w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
       }
        
           // 重置已写入的字节数, 8 是 文件头 MagicChunks 占用的大小
           // 即前面 `Write header metadata for new file.` 的 b 部分
       w.n = 8
      
       return nil
      }
      
    3. Writer.WriteChunks

      func (w *Writer) WriteChunks(chks ...Meta) error {
       // Calculate maximum space we need and cut a new segment in case
       // we don't fit into the current one.
           // 计算所有 chunks 可能占用的空间
       maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
       for _, c := range chks {
           maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
           maxLen += int64(len(c.Chunk.Bytes()))
       }
       newsz := w.n + maxLen
      
        // 根据这里执行 w.cut() 的判断条件, 实际上是可能出现单个文件超过 w.segmentSize 的
       if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
           if err := w.cut(); err != nil {
               return err
           }
       }
      
       var (
           // 初始化 b 作为写入长度, 写入 chunk 编码方式, 计算 hash 时等的 buffer
           b   = [binary.MaxVarintLen32]byte{}
           seq = uint64(w.seq()) << 32
       )
       for i := range chks {
           chk := &chks[i]
      
           // 用于定位 Chunk 
           chk.Ref = seq | uint64(w.n)
      
           // ...
      
           // 校验数据
           w.crc32.Reset()
           if err := chk.writeHash(w.crc32); err != nil {
               return err
           }
           if err := w.write(w.crc32.Sum(b[:0])); err != nil {
               return err
           }
       }
      
       return nil
      }
      
    ByteSlice

    用于 Reader 中逐段读取数据

    Reader

    用于读取数据块

    1. NewDirReader

      // NewDirReader returns a new Reader against sequentially numbered files in the
      // given directory.
      func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
           // 根据命名规则得到所有数据文件
       files, err := sequenceFiles(dir)
       if err != nil {
           return nil, err
       }
        
           // 初始化一个 chunkenc.Pool
       if pool == nil {
           pool = chunkenc.NewPool()
       }
      
       var bs []ByteSlice
       var cs []io.Closer
      
       for _, fn := range files {
           f, err := fileutil.OpenMmapFile(fn)
           if err != nil {
               return nil, errors.Wrapf(err, "mmap files")
           }
           cs = append(cs, f)
           bs = append(bs, realByteSlice(f.Bytes()))
       }
       return newReader(bs, cs, pool)
      }
      
    2. Reader.Chunk

      // 根据定位读取指定 Chunk
      func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
       // 分别计算文件所在的位置, 和 Chunk 数据的起始位置
       var (
           seq = int(ref >> 32)
           off = int((ref << 32) >> 32)
       )
       
       // ...
      
       // 将数据封装成一个 chunkenc.Chunk
       return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l])
      }
      

    相关文章

      网友评论

          本文标题:prometheus/tsdb 的源码阅读笔记 0x01

          本文链接:https://www.haomeiwen.com/subject/myjgnxtx.html