美文网首页
loki源码阅读之ingester

loki源码阅读之ingester

作者: wwq2020 | 来源:发表于2020-06-18 13:42 被阅读0次

    简介

    ingester负责在把日志写入到长期存储后端和返回日志数据

    相关源码

    cmd/loki/main.go中

    func main() {
        ...
        t, err := loki.New(config)
        util.CheckFatal("initialising loki", err)
    
        level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())
    
        err = t.Run()
        ...
    }
    
    

    pkg/loki/loki.go中

    func New(cfg Config) (*Loki, error) {
        loki := &Loki{
            cfg: cfg,
        }
    
        loki.setupAuthMiddleware()
        if err := loki.setupModuleManager(); err != nil {
            return nil, err
        }
        storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)
    
        return loki, nil
    }
    
    func (t *Loki) setupModuleManager() error {
        mm := modules.NewManager()
    
        mm.RegisterModule(Server, t.initServer)
        mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
        mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
        mm.RegisterModule(Ring, t.initRing)
        mm.RegisterModule(Overrides, t.initOverrides)
        mm.RegisterModule(Distributor, t.initDistributor)
        mm.RegisterModule(Store, t.initStore)
        mm.RegisterModule(Ingester, t.initIngester)
        mm.RegisterModule(Querier, t.initQuerier)
        mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
        mm.RegisterModule(TableManager, t.initTableManager)
        mm.RegisterModule(All, nil)
    
        // Add dependencies
        deps := map[string][]string{
            Ring:          {RuntimeConfig, Server, MemberlistKV},
            Overrides:     {RuntimeConfig},
            Distributor:   {Ring, Server, Overrides},
            Store:         {Overrides},
            Ingester:      {Store, Server, MemberlistKV},
            Querier:       {Store, Ring, Server},
            QueryFrontend: {Server, Overrides},
            TableManager:  {Server},
            All:           {Querier, Ingester, Distributor, TableManager},
        }
    
        for mod, targets := range deps {
            if err := mm.AddDependency(mod, targets...); err != nil {
                return err
            }
        }
    
        t.moduleManager = mm
    
        return nil
    }
    
    func (t *Loki) Run() error {
        serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
        if err != nil {
            return err
        }
    
        t.serviceMap = serviceMap
        ...
        var servs []services.Service
        for _, s := range serviceMap {
            servs = append(servs, s)
        }
        ...
        sm, err := services.NewManager(servs...)
        ...
        err = sm.StartAsync(context.Background())
    
        ...
    }
    

    github.com/cortexproject/cortex/pkg/util/modules/modules.go中

    func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
        if _, ok := m.modules[target]; !ok {
            return nil, fmt.Errorf("unrecognised module name: %s", target)
        }
        servicesMap := map[string]services.Service{}
    
        // initialize all of our dependencies first
        deps := m.orderedDeps(target)
        deps = append(deps, target) // lastly, initialize the requested module
    
        for ix, n := range deps {
            mod := m.modules[n]
    
            var serv services.Service
    
            if mod.initFn != nil {
                s, err := mod.initFn()
                if err != nil {
                    return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
                }
    
                if s != nil {
                    // We pass servicesMap, which isn't yet complete. By the time service starts,
                    // it will be fully built, so there is no need for extra synchronization.
                    serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
                }
            }
    
            if serv != nil {
                servicesMap[n] = serv
            }
        }
    
        return servicesMap, nil
    }
    

    github.com/cortexproject/cortex/pkg/util/services/manager.go中

    func NewManager(services ...Service) (*Manager, error) {
        if len(services) == 0 {
            return nil, errors.New("no services")
        }
    
        m := &Manager{
            services:  services,
            byState:   map[State][]Service{},
            healthyCh: make(chan struct{}),
            stoppedCh: make(chan struct{}),
        }
        ...
        return m, nil
    }
    
    func (m *Manager) StartAsync(ctx context.Context) error {
        for _, s := range m.services {
            err := s.StartAsync(ctx)
            if err != nil {
                return err
            }
        }
        return nil
    }
    

    pkg/loki/modules.go中

    func (t *Loki) initStore() (_ services.Service, err error) {
        if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
            t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
            switch t.cfg.Target {
            case Ingester:
                // We do not want ingester to unnecessarily keep downloading files
                t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
            case Querier:
                // We do not want query to do any updates to index
                t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
            default:
                t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
            }
        }
    
        t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
        if err != nil {
            return
        }
    
        return services.NewIdleService(nil, func(_ error) error {
            t.store.Stop()
            return nil
        }), nil
    }
    
    func (t *Loki) initIngester() (_ services.Service, err error) {
        t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
        t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
        t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
    
        // We want ingester to also query the store when using boltdb-shipper
        pc := activePeriodConfig(t.cfg.SchemaConfig)
        if pc.IndexType == local.BoltDBShipperType {
            t.cfg.Ingester.QueryStore = true
            mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
            if err != nil {
                return nil, err
            }
            t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
        }
    
        t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer)
        if err != nil {
            return
        }
    
        logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
        logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
        logproto.RegisterIngesterServer(t.server.GRPC, t.ingester)
        grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
        t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
        return t.ingester, nil
    }
    

    pkg/ingester/ingester.go中

    func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
        if cfg.ingesterClientFactory == nil {
            cfg.ingesterClientFactory = client.New
        }
        enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
        if err != nil {
            return nil, err
        }
    
        i := &Ingester{
            cfg:          cfg,
            clientConfig: clientConfig,
            instances:    map[string]*instance{},
            store:        store,
            loopQuit:     make(chan struct{}),
            flushQueues:  make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
            tailersQuit:  make(chan struct{}),
            factory: func() chunkenc.Chunk {
                return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
            },
        }
    
        i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
        if err != nil {
            return nil, err
        }
    
        i.lifecyclerWatcher = services.NewFailureWatcher()
        i.lifecyclerWatcher.WatchService(i.lifecycler)
    
        // Now that the lifecycler has been created, we can create the limiter
        // which depends on it.
        i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
    
        i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
        return i, nil
    }
    
    func (i *Ingester) starting(ctx context.Context) error {
        i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
        for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
            i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
            go i.flushLoop(j)
        }
    
        // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
        err := i.lifecycler.StartAsync(context.Background())
        if err != nil {
            return err
        }
    
        err = i.lifecycler.AwaitRunning(ctx)
        if err != nil {
            return err
        }
    
        // start our loop
        i.loopDone.Add(1)
        go i.loop()
        return nil
    }
    
    func (i *Ingester) loop() {
        defer i.loopDone.Done()
    
        flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
        defer flushTicker.Stop()
    
        for {
            select {
            case <-flushTicker.C:
                i.sweepUsers(false)
    
            case <-i.loopQuit:
                return
            }
        }
    }
    
    func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
        instanceID, err := user.ExtractOrgID(ctx)
        if err != nil {
            return nil, err
        } else if i.readonly {
            return nil, ErrReadOnly
        }
    
        instance := i.getOrCreateInstance(instanceID)
        err = instance.Push(ctx, req)
        return &logproto.PushResponse{}, err
    }
    
    func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
        inst, ok := i.getInstanceByID(instanceID)
        if ok {
            return inst
        }
    
        i.instancesMtx.Lock()
        defer i.instancesMtx.Unlock()
        inst, ok = i.instances[instanceID]
        if !ok {
            inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
            i.instances[instanceID] = inst
        }
        return inst
    }
    

    pkg/ingester/flush.go中

    func (i *Ingester) flushLoop(j int) {
        defer func() {
            level.Debug(util.Logger).Log("msg", "Ingester.flushLoop() exited")
            i.flushQueuesDone.Done()
        }()
    
        for {
            o := i.flushQueues[j].Dequeue()
            if o == nil {
                return
            }
            op := o.(*flushOp)
    
            level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp", op.fp, "immediate", op.immediate)
    
            err := i.flushUserSeries(op.userID, op.fp, op.immediate)
            if err != nil {
                level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
            }
    
            // If we're exiting & we failed to flush, put the failed operation
            // back in the queue at a later point.
            if op.immediate && err != nil {
                op.from = op.from.Add(flushBackoff)
                i.flushQueues[j].Enqueue(op)
            }
        }
    }
    
    func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
        instance, ok := i.getInstanceByID(userID)
        if !ok {
            return nil
        }
    
        chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
        if len(chunks) < 1 {
            return nil
        }
    
        ctx := user.InjectOrgID(context.Background(), userID)
        ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
        defer cancel()
        err := i.flushChunks(ctx, fp, labels, chunks)
        if err != nil {
            return err
        }
    
        instance.streamsMtx.Lock()
        for _, chunk := range chunks {
            chunk.flushed = time.Now()
        }
        instance.streamsMtx.Unlock()
        return nil
    }
    
    
    func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
        ...
        if err := i.store.Put(ctx, wireChunks); err != nil {
            return err
        }
        ...
        return nil
    }
    
    

    pkg/ingester/instance.go中

    func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
        i := &instance{
            cfg:        cfg,
            streams:    map[model.Fingerprint]*stream{},
            index:      index.New(),
            instanceID: instanceID,
    
            streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
            streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
    
            factory: factory,
            tailers: map[uint32]*tailer{},
            limiter: limiter,
    
            syncPeriod:  syncPeriod,
            syncMinUtil: syncMinUtil,
        }
        i.mapper = newFPMapper(i.getLabelsFromFingerprint)
        return i
    }
    
    func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
        i.streamsMtx.Lock()
        defer i.streamsMtx.Unlock()
    
        var appendErr error
        for _, s := range req.Streams {
    
            stream, err := i.getOrCreateStream(s)
            if err != nil {
                appendErr = err
                continue
            }
    
            prevNumChunks := len(stream.chunks)
            if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
                appendErr = err
                continue
            }
    
            memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
        }
    
        return appendErr
    }
    
    func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
        ...
        stream = newStream(i.cfg, fp, sortedLabels, i.factory)
        ...
    }
    

    pkg/ingester/stream.go中

    func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
        ...
        chunk := &s.chunks[len(s.chunks)-1]
        ...
        if err := chunk.chunk.Append(&entries[i]); err != nil {
        ...
    }
    

    github.com/cortexproject/cortex/pkg/util/services/basic_service.go中

    func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
        return &BasicService{
            startFn:             start,
            runningFn:           run,
            stoppingFn:          stop,
            state:               New,
            runningWaitersCh:    make(chan struct{}),
            terminatedWaitersCh: make(chan struct{}),
        }
    }
    
    func (b *BasicService) StartAsync(parentContext context.Context) error {
        switched, oldState := b.switchState(New, Starting, func() {
            b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
            b.notifyListeners(func(l Listener) { l.Starting() }, false)
            go b.main()
        })
    
        if !switched {
            return invalidServiceStateError(oldState, New)
        }
        return nil
    }
    

    相关文章

      网友评论

          本文标题:loki源码阅读之ingester

          本文链接:https://www.haomeiwen.com/subject/odkuxktx.html