美文网首页
聊聊promtail的positions

聊聊promtail的positions

作者: go4it | 来源:发表于2021-01-21 23:53 被阅读0次

    本文主要研究一下promtail的positions

    Positions

    loki/pkg/promtail/positions/positions.go

    type Positions interface {
        // GetString returns how far we've through a file as a string.
        // JournalTarget writes a journal cursor to the positions file, while
        // FileTarget writes an integer offset. Use Get to read the integer
        // offset.
        GetString(path string) string
        // Get returns how far we've read through a file. Returns an error
        // if the value stored for the file is not an integer.
        Get(path string) (int64, error)
        // PutString records (asynchronously) how far we've read through a file.
        // Unlike Put, it records a string offset and is only useful for
        // JournalTargets which doesn't have integer offsets.
        PutString(path string, pos string)
        // Put records (asynchronously) how far we've read through a file.
        Put(path string, pos int64)
        // Remove removes the position tracking for a filepath
        Remove(path string)
        // SyncPeriod returns how often the positions file gets resynced
        SyncPeriod() time.Duration
        // Stop the Position tracker.
        Stop()
    }
    

    Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法

    positions

    loki/pkg/promtail/positions/positions.go

    // Positions tracks how far through each file we've read.
    type positions struct {
        logger    log.Logger
        cfg       Config
        mtx       sync.Mutex
        positions map[string]string
        quit      chan struct{}
        done      chan struct{}
    }
    
    func (p *positions) Stop() {
        close(p.quit)
        <-p.done
    }
    
    func (p *positions) PutString(path string, pos string) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        p.positions[path] = pos
    }
    
    func (p *positions) Put(path string, pos int64) {
        p.PutString(path, strconv.FormatInt(pos, 10))
    }
    
    func (p *positions) GetString(path string) string {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        return p.positions[path]
    }
    
    func (p *positions) Get(path string) (int64, error) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        pos, ok := p.positions[path]
        if !ok {
            return 0, nil
        }
        return strconv.ParseInt(pos, 10, 64)
    }
    
    func (p *positions) Remove(path string) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        p.remove(path)
    }
    
    func (p *positions) remove(path string) {
        delete(p.positions, path)
    }
    
    func (p *positions) SyncPeriod() time.Duration {
        return p.cfg.SyncPeriod
    }
    

    positions定义了logger、cfg、mtx、positions、quit、done属性;它实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除

    New

    loki/pkg/promtail/positions/positions.go

    // New makes a new Positions.
    func New(logger log.Logger, cfg Config) (Positions, error) {
        positionData, err := readPositionsFile(cfg, logger)
        if err != nil {
            return nil, err
        }
    
        p := &positions{
            logger:    logger,
            cfg:       cfg,
            positions: positionData,
            quit:      make(chan struct{}),
            done:      make(chan struct{}),
        }
    
        go p.run()
        return p, nil
    }
    

    New方法会通过readPositionsFile读取positionData创建positions,然后异步执行p.run()

    run

    loki/pkg/promtail/positions/positions.go

    func (p *positions) run() {
        defer func() {
            p.save()
            level.Debug(p.logger).Log("msg", "positions saved")
            close(p.done)
        }()
    
        ticker := time.NewTicker(p.cfg.SyncPeriod)
        for {
            select {
            case <-p.quit:
                return
            case <-ticker.C:
                p.save()
                p.cleanup()
            }
        }
    }
    
    func (p *positions) save() {
        if p.cfg.ReadOnly {
            return
        }
        p.mtx.Lock()
        positions := make(map[string]string, len(p.positions))
        for k, v := range p.positions {
            positions[k] = v
        }
        p.mtx.Unlock()
    
        if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
            level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
        }
    }
    
    func (p *positions) cleanup() {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        toRemove := []string{}
        for k := range p.positions {
            // If the position file is prefixed with journal, it's a
            // JournalTarget cursor and not a file on disk.
            if strings.HasPrefix(k, "journal-") {
                continue
            }
    
            if _, err := os.Stat(k); err != nil {
                if os.IsNotExist(err) {
                    // File no longer exists.
                    toRemove = append(toRemove, k)
                } else {
                    // Can't determine if file exists or not, some other error.
                    level.Warn(p.logger).Log("msg", "could not determine if log file "+
                        "still exists while cleaning positions file", "error", err)
                }
            }
        }
        for _, tr := range toRemove {
            p.remove(tr)
        }
    }
    

    run方法通过time.NewTicker(p.cfg.SyncPeriod)来触发执行p.save()及p.cleanup();save方法将positions持久化到文件;cleanup方法遍历p.positions,从内存中移除文件不存在的position

    readPositionsFile

    loki/pkg/promtail/positions/positions.go

    func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {
    
        cleanfn := filepath.Clean(cfg.PositionsFile)
        buf, err := ioutil.ReadFile(cleanfn)
        if err != nil {
            if os.IsNotExist(err) {
                return map[string]string{}, nil
            }
            return nil, err
        }
    
        var p File
        err = yaml.UnmarshalStrict(buf, &p)
        if err != nil {
            // return empty if cfg option enabled
            if cfg.IgnoreInvalidYaml {
                level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
                return map[string]string{}, nil
            }
    
            return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
        }
    
        // p.Positions will be nil if the file exists but is empty
        if p.Positions == nil {
            p.Positions = map[string]string{}
        }
    
        return p.Positions, nil
    }
    

    readPositionsFile方法从文件读取位置到p.Positions

    writePositionFile

    loki/pkg/promtail/positions/positions.go

    func writePositionFile(filename string, positions map[string]string) error {
        buf, err := yaml.Marshal(File{
            Positions: positions,
        })
        if err != nil {
            return err
        }
    
        target := filepath.Clean(filename)
        temp := target + "-new"
    
        err = ioutil.WriteFile(temp, buf, os.FileMode(positionFileMode))
        if err != nil {
            return err
        }
    
        return os.Rename(temp, target)
    }
    

    writePositionFile方法将positions写入文件

    小结

    promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法;positions实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除。

    doc

    相关文章

      网友评论

          本文标题:聊聊promtail的positions

          本文链接:https://www.haomeiwen.com/subject/bubjzktx.html