美文网首页
Prometheus数据抓取及数据存储实现

Prometheus数据抓取及数据存储实现

作者: 天生小包 | 来源:发表于2020-04-09 11:30 被阅读0次

    Goroutine生命周期

    Prometheus使用一种通用的Goroutine生命周期的管理机制oklog的run.Group。

    // Croup 收集 actors 函数然后并行运行它们;当一个 actor(函数)返回,所有 actors 被 interrupted
    type Group struct {
      actors []actor
    }
    
    // 将一个 actor(函数)添加到group,每一个 actor 必须预占一个中断函数,如果调用了中断函数,execute必须return
    // 此外,即使在执行 return 后调用 interrupt 也必须是安全的
    // 第一个 return 的 actor(函数)中断所有正在运行的 actors; error 传递给 interrupt 函数并有 Run 返回
    func (g *Group) Add(execute func() error, interrupt func(error)) {
      g.actors = append(g.actors, actor{execute, interrupt})
    }
    // Run 并行运行所有 actors; 当第一个 actor return,所有其他的 actors被中断
    // 只有当所有的 actors(函数)都退出 Run 才能 return; Run 返回的 error 是第一个 actor 返回的 error
    func (g *Group) Run() error {
      if len(g.actors) == 0 {
        return nil
      }
    
      // Run each actor.
      errors := make(chan error, len(g.actors))
      for _, a := range g.actors {
        go func(a actor) {
          errors <- a.execute()
        }(a)
      }
    
      // Wait for the first actor to stop.
      err := <-errors
    
      // Signal all actors to stop.
      for _, a := range g.actors {
        a.interrupt(err)
      }
    
      // Wait for all actors to stop.
      for i := 1; i < cap(errors); i++ {
        <-errors
      }
    
      // Return the original error.
      return err
    }
    
    type actor struct {
      execute   func() error
      interrupt func(error)
    }
    
    ## example in prometheus main.go 478~725 (Fragment)
    var g run.Group
    {
        // Scrape discovery manager.
        g.Add(
          func() error {
            err := discoveryManagerScrape.Run()
            level.Info(logger).Log("msg", "Scrape discovery manager stopped")
            return err
          },
          func(err error) {
            level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
            cancelScrape()
          },
        )
      }
    {
        // Scrape manager.
        g.Add(
          func() error {
            // When the scrape manager receives a new targets list
            // it needs to read a valid config for each job.
            // It depends on the config being in sync with the discovery manager so
            // we wait until the config is fully loaded.
            <-reloadReady.C
    
            err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
            level.Info(logger).Log("msg", "Scrape manager stopped")
            return err
          },
          func(err error) {
            // Scrape manager needs to be stopped before closing the local TSDB
            // so that it doesn't try to write samples to a closed storage.
            level.Info(logger).Log("msg", "Stopping scrape manager...")
            scrapeManager.Stop()
          },
        )
    }
    ...
    if err := g.Run(); err != nil {
        level.Error(logger).Log("err", err)
        os.Exit(1)
    }
    
    

    服务发现机制

    Prometheus作为云时代最受欢迎的监控系统之一,它的服务发现机制在其中发挥着重要作用。它能够支持包括 Kubernetes、Azure、EC2、GCE、OpenStack、Consul、Marathon、Zookeeper及静态文件类型等十几种服务发现的提供者。通过Discovery Manager管理所有provider,定时(默认5秒)更新服务发现的Targets,根据各个Targets的Scrape配置文件Pull监控数据,转换成保存于本地TSDB时序数据库。

    源码片段

    ## discovery/manager.go
    // Discover 提供关于抓取目标组的信息,它维护一组TargetGroups源,只要发现provider检测到潜在的更改,它就会通过channel发送TargetGroup
    type Discoverer interface {
      // Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send updated target groups.
      // Must returns if the context gets canceled. It should not close the update channel on returning.
      Run(ctx context.Context, up chan<- []*targetgroup.Group)
    }
    
    // provider holds a Discoverer instance, its configuration and its subscribers.
    type provider struct {
      name   string
      d      Discoverer
      subs   []string
      config interface{}
    }
    
    // Manager维护一组服务发现提供者并且将每一个更新发送到map channel; Targets 通过target名称分组
    type Manager struct {
      logger         log.Logger
      name           string
      mtx            sync.RWMutex
      ctx            context.Context
      discoverCancel []context.CancelFunc
    
      // Some Discoverers(eg. k8s) send only the updates for a given target group
      // so we use map[tg.Source]*targetgroup.Group to know which group to update.
      targets map[poolKey]map[string]*targetgroup.Group
      // providers keeps track of SD providers.
      providers []*provider
      // The sync channel sends the updates as a map where the key is the job value from the scrape config.
      syncCh chan map[string][]*targetgroup.Group
    
      // How long to wait before sending updates to the channel. The variable
      // should only be modified in unit tests.
      updatert time.Duration
    
      // The triggerSend channel signals to the manager that new updates have been received from providers.
      triggerSend chan struct{}
    }
    
    

    Scrape流程

    1、服务发现active targets即Prometheus Pull监控指标的目标endpoint。
    2、ScrapeManager以map形式维护各个Targets的抓取配置、抓取池和Target Group,其key是target labels的hash值。
    3、它的Run方法间隔5秒钟触发reloader,检测当前scrapePools中的targets group,如果scrape configs中也存在则初始化sp,并行同步更新targets group中所有targets。
    4、计算target的key(t.hash()),如果当前activeTargets中不存在该key,则在activeTargets及loops中添加该target,并新启一个goroutine后台执行loop.run()方法;如果这个key对应的target已经存在于activeTargets则更新target的discoveredLabels。

    image

    5.遍历更新完当前targetsgroup中所有targets,从当前activeTargets中移除旧的targets并停止scraper的loops

    image

    scraper loop :是一个interface,其中有两个方法run和stop; run执行抓取任务将抓取的时序数据append到TSDB, stop则停止抓取任务

    image

    scrapeLoop 实现 loop 接口,核心代码块摘要

    scraper run方法,执行http/https Get请求到targets的metricPath拉取(scrape)监控数据缓存到buf,然后将时序数据append到TSDB,存储的时间戳timestamp是抓取监控指标的时间(正常情况下严格递增)

    image

    scrape stop 方法 停止scrape

    image

    TSDB 存储 sample

    Prometheus TSDB参考 Facebook Gorilla [2] 时序数据库原理实现,其核心主要是对timestamp和时序数据value值的高压缩。在时序的场景中,将每个时序对可以视为一对64的值,以时间轴为横坐标表示该点的timestamp,时序值value为该点的纵坐标,每一个时序对按时间轴递增而时序值根据采集到的实际值变动。

    type sample struct {
      metric  labels.Labels
      t       int64
      v       float64
    }
    
    

    TSDB如何保证时序递增

    tsdb基于delta-of-delta编码的方式压缩timestamp,基于XOR方式压缩时序点value的浮点值,这种压缩方式可以将16byte的时序点压缩到1.37byte,减少10x存储空间,也减少73x查询延迟同时提高14x查询吞吐。

    delta-of-delta 使用变长编码算法(简称 dod)
    1.块头存储完整的起始时间戳t-1,它与两小时(默认块大小)窗口对齐;块中的第一个时间戳t0被存储为14比特的t-1的增量(delta = t0 - th)
    2.(a) 计算delta-of-delta  D = (tn - tn-1) - (tn-1 - tn-2)
    (b) 根据D的取值范围分配不同位存储不同值
      D = 0                 1bit         '0'
      D ∈ [-8191,8192]      14bits       '10'
      D ∈ [-65535,65536]    17bits       '110'
      D ∈ [-524287,524288]  20bits       '1110'
      D ∈ 其他值             64bits       '1111'
    
    
    tDelta = uint64(t - a.t), golang uint64类型取值非负保证了时序的递增性,不会出现乱序
    image
    XOR方法压缩浮点值value
    1.直接存储时序中第一个时序点value值,不做压缩
    2.从第二个时序点开始,将value值与前一时序点value值进行XOR运算
    3.如果XOR运算结果为0,则表示这两个时序点的Value值相同,只需要使用一个bit位存储'0'值即可
    4.如果XOR运算结果不是0,则需要使用到2个bit值的控制位,并首先将第一个bit存储为'1',接下来看控制位的第二个bit值
      (a)控制位第二个bit位为'0'时,表示此次计算得到的XOR结果中间非零的部分被前一个XOR运算结果包含.例如,与前一个XOR运算结果相比,此次XOR运算结果也有同样多的前置0和同样多的尾部0,那么我们只需要存储XOR结果中非0的部分即可
      (b)控制位第二个bit位为'1'时,需要用5个bit位来存储XOR结果中前置0的数量,然后用6个bit位来存储XOR结果中非0位的长度,最后再存储XOR的值
    
    
    image

    小结

    prometheus核心是一个pull-based system,它的整个生态系统设计也是基于这种方式。这个方式的优缺点在文中也圈点出来

    • 优点1 :Prometheus完善的服务发现机制兼容当前几乎所有平台,以kubernetes容器云监控为例,实现自动发现并监控资源对象生成告警策略,大大解放运维成本。
    • 优点2: 时序数据以抓取时间作为timestamp存储,正常情况下可以保证timestamp严格递增(排除破坏性更改系统时间的行为),在某些场景下这也成为它的一个缺点。
    • 缺点 :在实际生产中复杂的云环境下,若要考虑兼容平台原有监控系统,收集客户端上报的的监控数据经过parser转换成prometheus格式的监控数据。然后等待prometheus pull,然后以pull的时间保存时序数据,这其中的时延依网络环境变得非常不可控,这对于时间敏感性高的监控指标来说是硬伤。

    于是在构思一种让Prometheus接收数据的方式,timestamp使用指标采集时间,不经过prometheus服务发现模块,数据预处理后直接写入TSDB。接收到的数据首先经过预处理(监控项、标签符合prometheus规范,时间必须在有效范围...); 对于push上来的监控项,由于其游离于discovery之外,如果想对某些监控项进行聚合计算、告警规则等配置的增删改就需要额外的策略来实现动态可配置。

    相关文章

      网友评论

          本文标题:Prometheus数据抓取及数据存储实现

          本文链接:https://www.haomeiwen.com/subject/cowqmhtx.html