从两周前我就开始阅读prometheus时序数据库存储部分。此前我采用了顺序阅读的方式,虽然这种方式遵循着代码执行顺序,但经常由于不理解各模块的作用,以及各模块调用过程又比较复杂,整个阅读过程困难重重。必须改变阅读方式,模块化地阅读代码,或者才是正道。也决心写点文章,记录下阅读代码的过程。
今天首先阅读的wal.go的代码,这部分代码是WriteAheadLog部分的代码实现。
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
LogSamples([]RefSample) error
LogDeletes([]Stone) error
Truncate(mint int64, keep func(uint64) bool) error
Close() error
}
该接口有两个实现noWAL和SegmentWAL。noWAL即什么也不干的wal,所有方法几乎都是直接return,重点来看下SegmentWAL。
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
dirFile *os.File
files []*segmentFile
logger log.Logger
flushInterval time.Duration
segmentSize int64
crc32 hash.Hash32
cur *bufio.Writer
curN int64
stopc chan struct{}
donec chan struct{}
actorc chan func() error // sequentialized background operations
buffers sync.Pool
}
先不要去管这些结构体成员, 等后面方法中用到了再去理解,这里贴图只是为了后面代码提到的时候方便翻阅。首先来看看SegementWAL实现的func (w *SegmentWAL)LogSeries(series []RefSeries)error 方法。这个方法接收一个slice,类型为结构体RefSeries,看名字,这个结构体似乎是用户指向一个Series,下图给出了这个结构体的定义。
// RefSeries is the series labels with the series ID.
type RefSeries struct {
Ref uint64
Labels labels.Labels
}
结构很简单,由一个uint64和lables.labels组成。Labels是时序数据的标签,其实质是KV组成的数组,prometheus里面将时序数据的metric也包含在Labels里面,即name: $name的形式。也就是说Lables能够唯一地表示一条时间序列。uint64则是这条时间序列的id。也就是说LogSeries方法就是存储标签名称以及id使用的。接下来看看这个方法是如何实现的。
// LogSeries writes a batch of new series labels to the log.
// The series have to be ordered.
func (w *SegmentWAL) LogSeries(series []RefSeries) error {
//获得一个buffer,至于buffer如何实现,后面再表
buf := w.getBuffer()
//将series存储到buffer,返回一个uint8类型的flag
flag := w.encodeSeries(buf, series)
//加锁,读写可能会竞争
w.mtx.Lock()
defer w.mtx.Unlock()
//将buf内的内容写入到xx
err := w.write(WALEntrySeries, flag, buf.get())
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "log series")
}
tf := w.head()
for _, s := range series {
if tf.minSeries > s.Ref {
tf.minSeries = s.Ref
}
}
return nil
}
这里用到了几个SegmentWAL结构体成员变量,首先是
type SegmentWAL struct {
......
buffers sync.Pool
}
sync.Pool是官方的包,这里给出一段介绍
众所周知,go是自动垃圾回收的(garbage collector),这大大减少了程序编程负担。但gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当甚至会严重影响程序的性能。因此性能要求高的场景不能任意产生太多的垃圾(有gc但又不能完全依赖它挺恶心的),如何解决呢?那就是要重用对象了,我们可以简单的使用一个chan把这些可重用的对象缓存起来,但如果很多goroutine竞争一个chan性能肯定是问题.....由于golang团队认识到这个问题普遍存在,为了避免大家重造车轮,因此官方统一出了一个包Pool。原文:https://blog.csdn.net/yongjian_lian/article/details/42058893
简单地说,就是缓存对象, 不会被gc清理掉。需要用的时候,再取出来。这里重用的对象或者说结构体是encbuf,是一个结构体,
// encbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
encbuf使用了binary包,简单的数字与字节序列的转换以及变长值的编解码,prometheus采用了BigEndian方式进行编解码。下面来看下encodeSeries方法
func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
for _, s := range series {
buf.putBE64(s.Ref)
buf.putUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.putUvarintStr(l.Name)
buf.putUvarintStr(l.Value)
}
}
return walSeriesSimple
}
对于多组RefSeries,首先写入series的RefId,然后用变长编码写入labels个数,最后用变长字符串的形式分别写入标签的名字和值。
写到buf里面之后,调用SegmentWAL的write方法
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
// Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment.
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
var (
sz = int64(len(buf)) + 6
newsz = w.curN + sz
)
// XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
// Probably fine in general but may yield a lot of short files in some cases.
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
if err := w.cut(); err != nil {
return err
}
}
n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)
w.curN += int64(n)
return err
}
func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
crc32.Reset()
wr = io.MultiWriter(crc32, wr)
var b [6]byte
b[0] = byte(t)
b[1] = flag
binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
n1, err := wr.Write(b[:])
if err != nil {
return n1, err
}
n2, err := wr.Write(buf)
if err != nil {
return n1 + n2, err
}
n3, err := wr.Write(crc32.Sum(b[:0]))
return n1 + n2 + n3, err
}
首先来看一下一个结构体成员
type SegmentWAL struct {
......
cur *bufio.Writer
curN int64
segmentSize int64 // default 256 * 1024 * 1024 // 256 MB
}
先将这个bufio.Writer如何创建的过程暂且不表,只要知道这是一个bufio.Writer类型的即可。回到SegmentWAL的write方法中。这个方法主要执行的是w.writeTo()方法,但是当条件一成立的时候,先执行w.cut()方法。这个条件解释如下:
w.cur为nil
m.curN超过了最大的segmentSize,
加上新数据的长度超过segmentSize且新数据规模小于segmentSize
只要符合其中一个条件,就会执行w.cut(),接着再执行wr.WriteTo()方法。在wr.WriteTo()方法中主要有几个知识点:
crc32包实现了32位循环冗余校验(CRC-32)的校验和算法,参见:http://en.wikipedia.org/wiki/Cyclic_redundancy_check
wr = io.MultiWriter(crc32, wr), 类似于linux tee命令,详见http://man.linuxde.net/tee
写入到wr中的是这么五段 WALEntryType类型+ flag + buf长度 + buf本身 + 校验和
返回写入的总长度
由于每个segmentFile不能超过预设的大小,因此会有一个切分的过程,具体代码在func (w *SegmentWAL) cut() error 函数中。这个函数主要干了两件事情
- 保存已有的segmentFile,保证数据写入到磁盘并且关闭
- 创建新的segmentFile
对于第一点,首先是执行w.Flush(),也就是执行w.cur.Flush(),实际执行的是bufio中Writer.Flush()方法,也就是将bufio Writer的缓存刷新到底层的io.Writer中去,调用底层io.Writer的write方法,同时处理了一把shortWrite的情况,也就是返回的已写入字数少于要求写入的,具体处理方法见代码
func (b *Writer) Flush() error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
n, err := b.wr.Write(b.buf[0:b.n])
if n < b.n && err == nil {
err = io.ErrShortWrite
}
if err != nil {
if n > 0 && n < b.n {
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = err
return err
}
b.n = 0
return nil
}
这步处理好之后,发送了一个函数到w.actorc,
go func() {
w.actorc <- func() error {
off, err := hf.Seek(0, io.SeekCurrent)
if err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Truncate(off); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Sync(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
return nil
}
}()
第一个遇到的函数是hf.Seek(0, io.SeekCurrent), hf是head file的缩写,就当前的file,执行seek(文档链接)操作,0表示位移量,io.SeekCurrent表示从当前位置,这样不就是不移动的意思么?我也不知道操作的意义,可能是为了获得当前的偏移量-变量off吧。
然后执行hf.Truncate(off),对应文档链接,也就是将文档截断为off指定的大小,似乎用在日志截断比较多,要注意的是,不会改变io offset的位置,很好奇,如果off为0,同时io offset在非0位置,再进行读写会怎么样,改天可以测试一下。这步操作似乎就是在当前读写位置进行截断,和文件缓存难道有关系么?再回去看下linux文件读写部分的知识。
接着执行hf.Sync(),也就是linux文件读写的fsync操作,将内核中缓存的修改过的数据刷新到磁盘上去,为啥要进行先truncate再sync的操作,得理解内核对于文件读写的原理才行。
最后就是文件的close()操作。
接下去就是创建新的segment文件。segment文件的命名规则是%0.6d,即000001 000002的方式,具体创建代码如下:
// createSegmentFile creates a new segment file with the given name. It preallocates
// the standard segment size if possible and writes the header.
func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}
//预分配那么的空间
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return nil, err
}
//写入元信息,
// Write header metadata for new file.
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], WALMagic)
metab[4] = WALFormatDefault
if _, err := f.Write(metab); err != nil {
return nil, err
}
return f, err
}
创建好之后,给通道w.actorc传一个函数,用于w.dirFile的修改内容刷新到磁盘上,调用了fsync系统调用。代码如下
go func() {
w.actorc <- func() error {
return errors.Wrap(w.dirFile.Sync(), "sync WAL directory")
}
}()
最后将os.File对象包装成segmentFile结构体,放入到w.files数组中,再更新w.cur和curN成员,代码如下
w.cur = bufio.NewWriterSize(f, 8*1024*1024)
w.curN = 8
至此,只是完成了利用wal将记录先写到log的过程,并没有真正地写入到时序数据库中.
网友评论