美文网首页
prometheus tsdb部分源码阅读之wal.LogSer

prometheus tsdb部分源码阅读之wal.LogSer

作者: 在蜀山 | 来源:发表于2018-11-05 19:32 被阅读0次

    从两周前我就开始阅读prometheus时序数据库存储部分。此前我采用了顺序阅读的方式,虽然这种方式遵循着代码执行顺序,但经常由于不理解各模块的作用,以及各模块调用过程又比较复杂,整个阅读过程困难重重。必须改变阅读方式,模块化地阅读代码,或者才是正道。也决心写点文章,记录下阅读代码的过程。
    今天首先阅读的wal.go的代码,这部分代码是WriteAheadLog部分的代码实现。

    // WAL is a write ahead log that can log new series labels and samples.
    // It must be completely read before new entries are logged.
    type WAL interface {
        Reader() WALReader
        LogSeries([]RefSeries) error
        LogSamples([]RefSample) error
        LogDeletes([]Stone) error
        Truncate(mint int64, keep func(uint64) bool) error
        Close() error
    }
    

    该接口有两个实现noWAL和SegmentWAL。noWAL即什么也不干的wal,所有方法几乎都是直接return,重点来看下SegmentWAL。

    // SegmentWAL is a write ahead log for series data.
    type SegmentWAL struct {
        mtx     sync.Mutex
        metrics *walMetrics
    
        dirFile *os.File
        files   []*segmentFile
    
        logger        log.Logger
        flushInterval time.Duration
        segmentSize   int64
    
        crc32 hash.Hash32
        cur   *bufio.Writer
        curN  int64
    
        stopc   chan struct{}
        donec   chan struct{}
        actorc  chan func() error // sequentialized background operations
        buffers sync.Pool
    }
    

    先不要去管这些结构体成员, 等后面方法中用到了再去理解,这里贴图只是为了后面代码提到的时候方便翻阅。首先来看看SegementWAL实现的func (w *SegmentWAL)LogSeries(series []RefSeries)error 方法。这个方法接收一个slice,类型为结构体RefSeries,看名字,这个结构体似乎是用户指向一个Series,下图给出了这个结构体的定义。

    // RefSeries is the series labels with the series ID.
    type RefSeries struct {
        Ref    uint64
        Labels labels.Labels
    }
    

    结构很简单,由一个uint64和lables.labels组成。Labels是时序数据的标签,其实质是KV组成的数组,prometheus里面将时序数据的metric也包含在Labels里面,即name: $name的形式。也就是说Lables能够唯一地表示一条时间序列。uint64则是这条时间序列的id。也就是说LogSeries方法就是存储标签名称以及id使用的。接下来看看这个方法是如何实现的。

    // LogSeries writes a batch of new series labels to the log.
    // The series have to be ordered.
    func (w *SegmentWAL) LogSeries(series []RefSeries) error {
        //获得一个buffer,至于buffer如何实现,后面再表
        buf := w.getBuffer()
        //将series存储到buffer,返回一个uint8类型的flag
        flag := w.encodeSeries(buf, series)
        //加锁,读写可能会竞争
        w.mtx.Lock()
        defer w.mtx.Unlock()
        //将buf内的内容写入到xx
        err := w.write(WALEntrySeries, flag, buf.get())
    
        w.putBuffer(buf)
    
        if err != nil {
            return errors.Wrap(err, "log series")
        }
    
        tf := w.head()
    
        for _, s := range series {
            if tf.minSeries > s.Ref {
                tf.minSeries = s.Ref
            }
        }
        return nil
    }
    

    这里用到了几个SegmentWAL结构体成员变量,首先是

    type SegmentWAL struct {
        ......
        buffers sync.Pool
    }
    

    sync.Pool是官方的包,这里给出一段介绍

    众所周知,go是自动垃圾回收的(garbage collector),这大大减少了程序编程负担。但gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当甚至会严重影响程序的性能。因此性能要求高的场景不能任意产生太多的垃圾(有gc但又不能完全依赖它挺恶心的),如何解决呢?那就是要重用对象了,我们可以简单的使用一个chan把这些可重用的对象缓存起来,但如果很多goroutine竞争一个chan性能肯定是问题.....由于golang团队认识到这个问题普遍存在,为了避免大家重造车轮,因此官方统一出了一个包Pool。原文:https://blog.csdn.net/yongjian_lian/article/details/42058893

    简单地说,就是缓存对象, 不会被gc清理掉。需要用的时候,再取出来。这里重用的对象或者说结构体是encbuf,是一个结构体,

    // encbuf is a helper type to populate a byte slice with various types.
    type encbuf struct {
        b []byte
        c [binary.MaxVarintLen64]byte
    }
    

    encbuf使用了binary包,简单的数字与字节序列的转换以及变长值的编解码,prometheus采用了BigEndian方式进行编解码。下面来看下encodeSeries方法

    func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
        for _, s := range series {
            buf.putBE64(s.Ref)
            buf.putUvarint(len(s.Labels))
    
            for _, l := range s.Labels {
                buf.putUvarintStr(l.Name)
                buf.putUvarintStr(l.Value)
            }
        }
        return walSeriesSimple
    }
    

    对于多组RefSeries,首先写入series的RefId,然后用变长编码写入labels个数,最后用变长字符串的形式分别写入标签的名字和值。
    写到buf里面之后,调用SegmentWAL的write方法

    func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
        // Cut to the next segment if the entry exceeds the file size unless it would also
        // exceed the size of a new segment.
        // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
        var (
            sz    = int64(len(buf)) + 6
            newsz = w.curN + sz
        )
        // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
        // Probably fine in general but may yield a lot of short files in some cases.
        if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
            if err := w.cut(); err != nil {
                return err
            }
        }
        n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)
    
        w.curN += int64(n)
    
        return err
    }
    
    func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
        if len(buf) == 0 {
            return 0, nil
        }
        crc32.Reset()
        wr = io.MultiWriter(crc32, wr)
    
        var b [6]byte
        b[0] = byte(t)
        b[1] = flag
    
        binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
    
        n1, err := wr.Write(b[:])
        if err != nil {
            return n1, err
        }
        n2, err := wr.Write(buf)
        if err != nil {
            return n1 + n2, err
        }
        n3, err := wr.Write(crc32.Sum(b[:0]))
    
        return n1 + n2 + n3, err
    }
    

    首先来看一下一个结构体成员

    type SegmentWAL struct {
        ......
        cur   *bufio.Writer
        curN  int64
        segmentSize   int64 // default 256 * 1024 * 1024 // 256 MB
    }
    

    先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:

    w.cur为nil
    m.curN超过了最大的segmentSize,
    加上新数据的长度超过segmentSize且新数据规模小于segmentSize

    只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:

    crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
    wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
    写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
    返回写入的总长度

    由于每个segmentFile不能超过预设的大小,因此会有一个切分的过程,具体代码在func (w *SegmentWAL) cut() error 函数中。这个函数主要干了两件事情

    1. 保存已有的segmentFile,保证数据写入到磁盘并且关闭
    2. 创建新的segmentFile
      对于第一点,首先是执行w.Flush(),也就是执行w.cur.Flush(),实际执行的是bufio中Writer.Flush()方法,也就是将bufio Writer的缓存刷新到底层的io.Writer中去,调用底层io.Writer的write方法,同时处理了一把shortWrite的情况,也就是返回的已写入字数少于要求写入的,具体处理方法见代码
    func (b *Writer) Flush() error {
        if b.err != nil {
            return b.err
        }
        if b.n == 0 {
            return nil
        }
        n, err := b.wr.Write(b.buf[0:b.n])
        if n < b.n && err == nil {
            err = io.ErrShortWrite
        }
        if err != nil {
            if n > 0 && n < b.n {
                copy(b.buf[0:b.n-n], b.buf[n:b.n])
            }
            b.n -= n
            b.err = err
            return err
        }
        b.n = 0
        return nil
    }
    

    这步处理好之后,发送了一个函数到w.actorc,

    go func() {
                w.actorc <- func() error {
                    off, err := hf.Seek(0, io.SeekCurrent)
                    if err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Truncate(off); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Sync(); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    if err := hf.Close(); err != nil {
                        return errors.Wrapf(err, "finish old segment %s", hf.Name())
                    }
                    return nil
                }
            }()
    

    第一个遇到的函数是hf.Seek(0, io.SeekCurrent), hf是head file的缩写,就当前的file,执行seek(文档链接)操作,0表示位移量,io.SeekCurrent表示从当前位置,这样不就是不移动的意思么?我也不知道操作的意义,可能是为了获得当前的偏移量-变量off吧。

    然后执行hf.Truncate(off),对应文档链接,也就是将文档截断为off指定的大小,似乎用在日志截断比较多,要注意的是,不会改变io offset的位置,很好奇,如果off为0,同时io offset在非0位置,再进行读写会怎么样,改天可以测试一下。这步操作似乎就是在当前读写位置进行截断,和文件缓存难道有关系么?再回去看下linux文件读写部分的知识。

    接着执行hf.Sync(),也就是linux文件读写的fsync操作,将内核中缓存的修改过的数据刷新到磁盘上去,为啥要进行先truncate再sync的操作,得理解内核对于文件读写的原理才行。

    最后就是文件的close()操作。

    接下去就是创建新的segment文件。segment文件的命名规则是%0.6d,即000001 000002的方式,具体创建代码如下:

    // createSegmentFile creates a new segment file with the given name. It preallocates
    // the standard segment size if possible and writes the header.
    func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) {
        f, err := os.Create(name)
        if err != nil {
            return nil, err
        }
            //预分配那么的空间
        if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
            return nil, err
        }
            //写入元信息,
        // Write header metadata for new file.
        metab := make([]byte, 8)
        binary.BigEndian.PutUint32(metab[:4], WALMagic)
        metab[4] = WALFormatDefault
    
        if _, err := f.Write(metab); err != nil {
            return nil, err
        }
        return f, err
    }
    

    创建好之后,给通道w.actorc传一个函数,用于w.dirFile的修改内容刷新到磁盘上,调用了fsync系统调用。代码如下

    go func() {
            w.actorc <- func() error {
                return errors.Wrap(w.dirFile.Sync(), "sync WAL directory")
            }
        }()
    

    最后将os.File对象包装成segmentFile结构体,放入到w.files数组中,再更新w.cur和curN成员,代码如下

    w.cur = bufio.NewWriterSize(f, 8*1024*1024)
    w.curN = 8
    

    至此,只是完成了利用wal将记录先写到log的过程,并没有真正地写入到时序数据库中.

    相关文章

      网友评论

          本文标题:prometheus tsdb部分源码阅读之wal.LogSer

          本文链接:https://www.haomeiwen.com/subject/vpyfxqtx.html