Influxdb中TSM文件结构解析之WAL

作者: 扫帚的影子 | 来源:发表于2019-01-03 13:16 被阅读0次

    存储在Influxdb中的数据类型

    存储每条数据时的时间戳类型
    • time
    Field字段的类型
    • interger - int64
    • unsigned - uint64
    • float64
    • boolean
    • string
    Field字段的类型在源码中对应类型

    对应的类型是Value,这是个interface,定义在tsdb/engine/tsm1/encoding.go

    • IntegerValue
    • UnsignedValue
    • FloatValue
    • BooleanValue
    • StringValue
      上面的每个类型都包括一个时间戳,这个时间戳就是这个数据被写入时的时间戳,我们看一下FloadValue的定义:
    type FloatValue struct {
        unixnano int64
        value    float64
    }
    
    编解码

    每种类型在存储时都需要作编码,尽可能地作压缩,所有针对各个类型均提供了Encoder和Decoder。
    这些Encoder负责将一组相同类型的Value作压缩编码,具体的编码算法我们这里不再展开。
    我们针对FloatValue作一下分析encodeFloatBlockUsing
    参数中values []Value就是一系列的FloatValue`,不仅包括Float值,还包括对应的时间戳,都需要被编码

    func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) {
        tsenc.Reset()
        venc.Reset()
    
        for _, v := range values {
            vv := v.(FloatValue)
            tsenc.Write(vv.unixnano) //使用TimeEncoder编码每个时间戳
            venc.Write(vv.value) //使用FloatEncoder编码每个Float值
        }
        venc.Flush()
    
        // Encoded timestamp values
        tb, err := tsenc.Bytes()
        if err != nil {
            return nil, err
        }
        // Encoded float values
        vb, err := venc.Bytes()
        if err != nil {
            return nil, err
        }
    
        // Prepend the first timestamp of the block in the first 8 bytes and the block
        // in the next byte, followed by the block
        // 将这一组FloatValue打包到一个Block
        return packBlock(buf, BlockFloat64, tb, vb), nil
    }
    
    打包到DataBlock

    DataBlock是写入和读取TSM文件的最小单位,每个DataBlock里存储的都是同样类型的Value,每个DataBlock里的Value对应都是同一个写入的Key,这个Key是series key + field;
    Influxdb算是列存储,在这里所有的Value是连续存在一起,这些Value对应的时间戳也是连续存在一起,这样更有利于作压缩

    influxdb_data_block.png

    这个结构中并没有记录Values部分的长度,这是因为我们记录了时间戳部分的总长,在解析时间戳部分时候我们可以得知有几个时间戳,也就知道了有几个Value。

    我们来看一下打包过程,结合上面的结构图,这个过程就很简单了:

    func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte {
        // We encode the length of the timestamp block using a variable byte encoding.
        // This allows small byte slices to take up 1 byte while larger ones use 2 or more.
        sz := 1 + binary.MaxVarintLen64 + len(ts) + len(values)
        if cap(buf) < sz {
            buf = make([]byte, sz)
        }
        b := buf[:sz]
        b[0] = typ
        i := binary.PutUvarint(b[1:1+binary.MaxVarintLen64], uint64(len(ts)))
        i += 1
    
        // block is <len timestamp bytes>, <ts bytes>, <value bytes>
        copy(b[i:], ts)
        // We don't encode the value length because we know it's the rest of the block after
        // the timestamp block.
        copy(b[i+len(ts):], values)
        return b[:i+len(ts)+len(values)]
    }
    
    解包DataBlock

    我们还以FloatValue为例

    func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
        // Block type is the next block, make sure we actually have a float block
        blockType := block[0]
        if blockType != BlockFloat64 {
            return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
        }
        
        // 跳过1字节的block type
        block = block[1:]
    
        tb, vb, err := unpackBlock(block)
        if err != nil {
            return nil, err
        }
    
        //计算有多少组Value
        sz := CountTimestamps(tb)
    
        if cap(*a) < sz {
            *a = make([]FloatValue, sz)
        } else {
            *a = (*a)[:sz]
        }
    
        tdec := timeDecoderPool.Get(0).(*TimeDecoder)
        vdec := floatDecoderPool.Get(0).(*FloatDecoder)
    
        var i int
        err = func(a []FloatValue) error {
            // Setup our timestamp and value decoders
            tdec.Init(tb)
            err = vdec.SetBytes(vb)
            if err != nil {
                return err
            }
    
            // Decode both a timestamp and value
            j := 0
            for j < len(a) && tdec.Next() && vdec.Next() {
                a[j] = FloatValue{unixnano: tdec.Read(), value: vdec.Values()}
                j++
            }
            i = j
    
            // Did timestamp decoding have an error?
            err = tdec.Error()
            if err != nil {
                return err
            }
    
            // Did float decoding have an error?
            return vdec.Error()
        }(*a)
        
            timeDecoderPool.Put(tdec)
        floatDecoderPool.Put(vdec)
    
        return (*a)[:i], err
    
    Dabablock的其他操作
    • BlockType:取block byte buffer的第一个字节
    func BlockType(block []byte) (byte, error) {
        blockType := block[0]
        switch blockType {
        case BlockFloat64, BlockInteger, BlockUnsigned, BlockBoolean, BlockString:
            return blockType, nil
        default:
            return 0, fmt.Errorf("unknown block type: %d", blockType)
        }
    }
    
    • BlockCount: 获取一个DabaBlock中包含的Value数量
    func BlockCount(block []byte) int {
        if len(block) <= encodedBlockHeaderSize {
            panic(fmt.Sprintf("count of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
        }
        // first byte is the block type
        tb, _, err := unpackBlock(block[1:])
        if err != nil {
            panic(fmt.Sprintf("BlockCount: error unpacking block: %s", err.Error()))
        }
        return CountTimestamps(tb)
    }
    
    • DecodeBlock: 解码一个DabaBlock,根据BlockType的不同调用不同的Decode方法
    func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
        if len(block) <= encodedBlockHeaderSize {
            panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
        }
    
        blockType, err := BlockType(block)
        if err != nil {
            return nil, err
        }
    
        switch blockType {
        case BlockFloat64:
            var buf []FloatValue
            decoded, err := DecodeFloatBlock(block, &buf)
            if len(vals) < len(decoded) {
                vals = make([]Value, len(decoded))
            }
            for i := range decoded {
                vals[i] = decoded[i]
            }
            return vals[:len(decoded)], err
        case BlockInteger:
            ...
        case BlockUnsigned:
            ...
        case BlockBoolean:
            ...
        case BlockString:
            ...
        default:
            panic(fmt.Sprintf("unknown block type: %d", blockType))
        }
    }
    

    WALEntry

    1. WAL在写入TSM文件时用作预写日志。
    2. 每个DB的每个RetentionPolicy下面的每个Shard下都有自己的一个单独的WAL文件目录,Influxdb在启动的配置文件中需设置单独的WAL目录,来存储所有Shard的WAL文件。
    3. 每个Shard都对应一个WAL目录,目录下有多个wal文件,每个称作一个WALSegment,默认大小是10M。文件命名规则是,以_开头,中间是ID,扩展名是wal, 比如 _00001.wal
    4. 每次写入WAL的内容称为一个WALEntry, 在写入和读取这个Entry时需要序列化和反序列化,我们先来看一下其定义:
    type WALEntry interface {
        Type() WalEntryType  // Entry的类型: WriteWALEntry, DeleteWALEntry, DeleteRangeWALEntry
        Encode(dst []byte) ([]byte, error)
        MarshalBinary() ([]byte, error) //使用上面的Encode方法作序列化
        UnmarshalBinary(b []byte) error //反序列化
        MarshalSize() int
    }
    

    我们下面来分析一下具体的三种WALEntry

    WriteWALEntry
    • 一组point组成一个WriteEALEntry,然后写入WALSegment;
      point是一个series对应的一些field的集合,每个point被唯一的series + timestamp标识,可以简单将point理解为就是一个insert语句写入的内容。
    • 定义:
    type WriteWALEntry struct {
        Values map[string][]Value
        sz     int
    }
    

    其中Valuse是个map,它的key是series key + field, 它的value是具有相同的key的所有field value;其实就是把多个point按series key + field作了合并

    • 结构图


      influxdb_write_wal_entry.png
    • 序列化Encode:完全按照上面的结构图来写入,比较清晰明了

    func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
        // 计算总大小,欲分配内存
        encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key
    
        // allocate or re-slice to correct size
        if len(dst) < encLen {
            dst = make([]byte, encLen)
        } else {
            dst = dst[:encLen]
        }
    
        // Finally, encode the entry
        var n int
        var curType byte
    
        // 遍历Values,逐一编码
        for k, v := range w.Values {
            // 确定field的类型
            switch v[0].(type) {
            case FloatValue:
                curType = float64EntryType
            case IntegerValue:
                curType = integerEntryType
            case UnsignedValue:
                curType = unsignedEntryType
            case BooleanValue:
                curType = booleanEntryType
            case StringValue:
                curType = stringEntryType
            default:
                return nil, fmt.Errorf("unsupported value type: %T", v[0])
            }
            
            // 写入类型
            dst[n] = curType
            n++
     
            // 写入key长度,key = series key + field
            binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
            n += 2
            // 写入 key
            n += copy(dst[n:], k)
    
            // 写入 value个数
            binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
            n += 4
    
            // 逐一写入合部的value
            for _, vv := range v {
                binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
                n += 8
    
                switch vv := vv.(type) {
                case FloatValue:
                    if curType != float64EntryType {
                        return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                    }
                    binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
                    n += 8
                case IntegerValue:
                    if curType != integerEntryType {
                        return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                    }
                    binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
                    n += 8
                case UnsignedValue:
                    if curType != unsignedEntryType {
                        return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                    }
                    binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
                    n += 8
                case BooleanValue:
                    if curType != booleanEntryType {
                        return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                    }
                    if vv.value {
                        dst[n] = 1
                    } else {
                        dst[n] = 0
                    }
                    n++
                case StringValue:
                    if curType != stringEntryType {
                        return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
                    }
                    binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
                    n += 4
                    n += copy(dst[n:], vv.value)
                default:
                    return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
                }
            }
        }
    
        return dst[:n], nil
    }
    
    DeleteWALEntry
    • 删除Series的WALEntry
    • 定义:
    type DeleteWALEntry struct {
        Keys [][]byte
        sz   int
    }
    
    • 结构图


      influxdb_wal_delete_entry.png
    • 编码Encode, 各个key间以\n分隔

    func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
        sz := w.MarshalSize()
    
        if len(dst) < sz {
            dst = make([]byte, sz)
        }
    
        var n int
        for _, k := range w.Keys {
            n += copy(dst[n:], k)
            n += copy(dst[n:], "\n")
        }
    
        // We return n-1 to strip off the last newline so that unmarshalling the value
        // does not produce an empty string
        return []byte(dst[:n-1]), nil
    }
    
    DeleteRangeWALEntry
    • 删除某个时间范围内的series的WALEntry
    • 定义:
    type DeleteRangeWALEntry struct {
        Keys     [][]byte
        Min, Max int64  // 开始时间戳和结束时间戳
        sz       int
    }
    
    • 结构图


      influxdb_delete_ragne_wal_entry.png
    • 编码Encode
    func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) {
        sz := w.MarshalSize()
    
        if len(b) < sz {
            b = make([]byte, sz)
        }
    
        // 写入开始和结束时间戳
        binary.BigEndian.PutUint64(b[:8], uint64(w.Min))
        binary.BigEndian.PutUint64(b[8:16], uint64(w.Max))
    
        i := 16
        // 逐一写入key
        for _, k := range w.Keys {
            binary.BigEndian.PutUint32(b[i:i+4], uint32(len(k)))
            i += 4
            i += copy(b[i:], k)
        }
    
        return b[:i], nil
    }
    

    WALEntry的写入

    • 上面我们介绍了三种WALEntry,在序列化后就可以被写入到WALSegment文件中了,在写之前可能还需要作压缩
    • 写入时候为了读取时便于解析,还需要按一定格式写入
      1. 先写入 1字节 的 WALEntry类型
      2. 再写入 4字节 的 序列化后且作了压缩的WALEntry的长度
      3. 最后写入 序列化后且作了压缩的WALEntry的具体内容
    • 使用 WALSegmentWriter类来写入:
    func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
        var buf [5]byte
        // 写入类型和具体内容的长度
        buf[0] = byte(entryType)
        binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))
    
        if _, err := w.bw.Write(buf[:]); err != nil {
            return err
        }
    
        // 写入具体内容
        if _, err := w.bw.Write(compressed); err != nil {
            return err
        }
    
        w.size += len(buf) + len(compressed)
    
        return nil
    }
    

    WAL

    WAL封装了一个预写日志的所有操作,正如前面提到了,一个Shard对应一个WAL,一个WAL在写入时又会产生多个WALSegment。
    我们来分析一下一些主要的方法:

    Open操作

    遍历一个Shard目录下的所有Segment文件,这些文件按id从小到大排序,作初始化操作

    func (l *WAL) Open() error {
        l.mu.Lock()
        defer l.mu.Unlock()
    ..
    
        if err := os.MkdirAll(l.path, 0777); err != nil {
            return err
        }
    
        // 获取所有segment 文件列表,按id从小到大排序,最后一个就是当前正写入的文件 
        segments, err := segmentFileNames(l.path)
        if err != nil {
            return err
        }
    
        if len(segments) > 0 {
            // 最后一个就是当前正写入的文件
            lastSegment := segments[len(segments)-1]
            
            // 获取最新的segment id
            id, err := idFromFileName(lastSegment)
            if err != nil {
                return err
            }
    
            // 初始化当前的segment id
            l.currentSegmentID = id
            stat, err := os.Stat(lastSegment)
            if err != nil {
                return err
            }
    
            if stat.Size() == 0 {
                // 如果文件大小为0, 删除
                os.Remove(lastSegment)
                segments = segments[:len(segments)-1]
            } else {
                //为写入,打开该文件 
                fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666)
                if err != nil {
                    return err
                }
                if _, err := fd.Seek(0, io.SeekEnd); err != nil {
                    return err
                }
                
                // 初始化当前的SegmentWriter
                l.currentSegmentWriter = NewWALSegmentWriter(fd)
    
                // Reset the current segment size stat
                atomic.StoreInt64(&l.stats.CurrentBytes, stat.Size())
            }
        }
    
        ...
        
        l.closing = make(chan struct{})
    
        return nil
    }
    
    writeToLog写入操作
    func (l *WAL) writeToLog(entry WALEntry) (int, error) {
        // 从buytesPool获取byte slice, 避免反复重新分配内存
        bytes := bytesPool.Get(entry.MarshalSize())
    
        // 将entry作编码,前面已经介绍过
        b, err := entry.Encode(bytes)
        if err != nil {
            bytesPool.Put(bytes)
            return -1, err
        }
    
        // 使用snappy压缩强词编码后的entry内容
        encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
    
        compressed := snappy.Encode(encBuf, b)
        bytesPool.Put(bytes)
    
        syncErr := make(chan error)
    
        segID, err := func() (int, error) {
            l.mu.Lock()
            defer l.mu.Unlock()
    
            // Make sure the log has not been closed
            select {
            case <-l.closing:
                return -1, ErrWALClosed
            default:
            }
    
            // roll the segment file if needed
            if err := l.rollSegment(); err != nil {
                return -1, fmt.Errorf("error rolling WAL segment: %v", err)
            }
    
            // write and sync
            // 使用SegmentWriter来写入entry内容
            if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
                return -1, fmt.Errorf("error writing WAL entry: %v", err)
            }
    
            select {
            case l.syncWaiters <- syncErr:
            default:
                return -1, fmt.Errorf("error syncing wal")
            }
            
            // 将执行file sync操作,刷到磁盘文件 
            l.scheduleSync()
    
            // Update stats for current segment size
            atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
    
            l.lastWriteTime = time.Now().UTC()
    
            return l.currentSegmentID, nil
        }()
    
        bytesPool.Put(encBuf)
    
        if err != nil {
            return segID, err
        }
    
        // schedule an fsync and wait for it to complete
        return segID, <-syncErr
    }
    

    Cache

    1. 时序数据在写入时,会先写入到上面介绍的WAL,然后写入到Cache,最后按照一定的策略Flush到磁盘文件。现在我们来介绍这个Cache。
    2. 这个Cache里缓存的是什么呢?
    3. 这个Cache用什么结构来作内存存储?
      我们下面来一一解答这些问题:
    Entry
    • 既然是Cache,那肯定是key-value结构,其中的key是series key + field name, 对应的value就是这个key所对应的若干个field value的集合,也就组合成了一个entry,我们来看下entry的定义:
    type entry struct {
        mu     sync.RWMutex
        values Values // All stored values.
    
        // The type of values stored. Read only so doesn't need to be protected by
        // mu.
        vtype byte
    }
    

    由这个定义我们可知,同一个entry里面的所有value的类型都是相同的,都是这个 vtype里所保存的类型。

    • Entry的创建:使用[]Value来创建,比较简单,但需要先判断这组value的类型是否一致
    func newEntryValues(values []Value) (*entry, error) {
        e := &entry{}
        e.values = make(Values, 0, len(values))
        e.values = append(e.values, values...)
    
        // No values, don't check types and ordering
        if len(values) == 0 {
            return e, nil
        }
    
        // 个人感觉应该先校验这组value的类型是否一致,不一致就不要作上面的make, append了。
        et := valueType(values[0])
        for _, v := range values {
            // Make sure all the values are the same type
            if et != valueType(v) {
                return nil, tsdb.ErrFieldTypeConflict
            }
        }
    
        // Set the type of values stored.
        e.vtype = et
    
        return e, nil
    }
    
    • Entry的add操作:和上面的创建类似,需要先判断这组value的类型是否一致
    • Entry的去重操作,去掉Values中时间戳相同的value,只保留其中的一个
    func (e *entry) deduplicate() {
        e.mu.Lock()
        defer e.mu.Unlock()
    
        if len(e.values) <= 1 {
            return
        }
        e.values = e.values.Deduplicate()
    }
    

    实际上是调用了Values.Deduplicate,这个Values提供了若干实用的方法,比如去掉,过滤等。

    • Entry过滤:过滤掉在给定时间戳范围内的Value
    func (e *entry) filter(min, max int64) {
        e.mu.Lock()
        if len(e.values) > 1 {
            e.values = e.values.Deduplicate()
        }
        e.values = e.values.Exclude(min, max)
        e.mu.Unlock()
    }
    

    实际上是调用了Values.Exclude

    storer
    • 上面解决了Cache存什么的问题,下面我们来解决怎么存的问题。这个存储器是storer,它是个interface,之前是实现了这个interface的struct都可以用来存Cache里的entry,我们先来看一下这个interface
    type storer interface {
        entry(key []byte) *entry                        // Get an entry by its key.
        write(key []byte, values Values) (bool, error)  // Write an entry to the store.
        add(key []byte, entry *entry)                   // Add a new entry to the store.
        remove(key []byte)                              // Remove an entry from the store.
        keys(sorted bool) [][]byte                      // Return an optionally sorted slice of entry keys.
        apply(f func([]byte, *entry) error) error       // Apply f to all entries in the store in parallel.
        applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
        reset()                                         // Reset the store to an initial unused state.
        split(n int) []storer                           // Split splits the store into n stores
        count() int                                     // Count returns the number of keys in the store
    }
    

    注释很清晰,我们这里不累述。

    • 那实际是用什么来存的呢? influxdb里实现了ring,它实现了这个storer的所有接口,定义在tsdb/engine/tsm1/ring.go中。简单来说,一个ring内部分为若干个固定数量的桶,这里叫partition, 一个key进来后,按key作hash,对桶的数量取模,确定好要存在哪个桶里,然后每个桶其实又是一个map,最后也就是将key-value存在这个桶的map里。因为cache的添加,读取可能很频繁且都需要加锁,分桶后,各个桶单独加锁,提升性能。代码很简单,这里不详述了。

    相关文章

      网友评论

        本文标题:Influxdb中TSM文件结构解析之WAL

        本文链接:https://www.haomeiwen.com/subject/vysfrqtx.html