简介
ingester负责在把日志写入到长期存储后端和返回日志数据
相关源码
cmd/loki/main.go中
func main() {
...
t, err := loki.New(config)
util.CheckFatal("initialising loki", err)
level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())
err = t.Run()
...
}
pkg/loki/loki.go中
func New(cfg Config) (*Loki, error) {
loki := &Loki{
cfg: cfg,
}
loki.setupAuthMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)
return loki, nil
}
func (t *Loki) setupModuleManager() error {
mm := modules.NewManager()
mm.RegisterModule(Server, t.initServer)
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
mm.RegisterModule(Ring, t.initRing)
mm.RegisterModule(Overrides, t.initOverrides)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(All, nil)
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
Overrides: {RuntimeConfig},
Distributor: {Ring, Server, Overrides},
Store: {Overrides},
Ingester: {Store, Server, MemberlistKV},
Querier: {Store, Ring, Server},
QueryFrontend: {Server, Overrides},
TableManager: {Server},
All: {Querier, Ingester, Distributor, TableManager},
}
for mod, targets := range deps {
if err := mm.AddDependency(mod, targets...); err != nil {
return err
}
}
t.moduleManager = mm
return nil
}
func (t *Loki) Run() error {
serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
if err != nil {
return err
}
t.serviceMap = serviceMap
...
var servs []services.Service
for _, s := range serviceMap {
servs = append(servs, s)
}
...
sm, err := services.NewManager(servs...)
...
err = sm.StartAsync(context.Background())
...
}
github.com/cortexproject/cortex/pkg/util/modules/modules.go中
func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
if _, ok := m.modules[target]; !ok {
return nil, fmt.Errorf("unrecognised module name: %s", target)
}
servicesMap := map[string]services.Service{}
// initialize all of our dependencies first
deps := m.orderedDeps(target)
deps = append(deps, target) // lastly, initialize the requested module
for ix, n := range deps {
mod := m.modules[n]
var serv services.Service
if mod.initFn != nil {
s, err := mod.initFn()
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}
if s != nil {
// We pass servicesMap, which isn't yet complete. By the time service starts,
// it will be fully built, so there is no need for extra synchronization.
serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
}
}
if serv != nil {
servicesMap[n] = serv
}
}
return servicesMap, nil
}
github.com/cortexproject/cortex/pkg/util/services/manager.go中
func NewManager(services ...Service) (*Manager, error) {
if len(services) == 0 {
return nil, errors.New("no services")
}
m := &Manager{
services: services,
byState: map[State][]Service{},
healthyCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
...
return m, nil
}
func (m *Manager) StartAsync(ctx context.Context) error {
for _, s := range m.services {
err := s.StartAsync(ctx)
if err != nil {
return err
}
}
return nil
}
pkg/loki/modules.go中
func (t *Loki) initStore() (_ services.Service, err error) {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
// We do not want ingester to unnecessarily keep downloading files
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
case Querier:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
default:
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
}
}
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
}
return services.NewIdleService(nil, func(_ error) error {
t.store.Stop()
return nil
}), nil
}
func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
// We want ingester to also query the store when using boltdb-shipper
pc := activePeriodConfig(t.cfg.SchemaConfig)
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
if err != nil {
return nil, err
}
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
}
logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
logproto.RegisterIngesterServer(t.server.GRPC, t.ingester)
grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
return t.ingester, nil
}
pkg/ingester/ingester.go中
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
if err != nil {
return nil, err
}
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
}
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
if err != nil {
return nil, err
}
i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)
// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
return i, nil
}
func (i *Ingester) starting(ctx context.Context) error {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
go i.flushLoop(j)
}
// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}
err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}
// start our loop
i.loopDone.Add(1)
go i.loop()
return nil
}
func (i *Ingester) loop() {
defer i.loopDone.Done()
flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
defer flushTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false)
case <-i.loopQuit:
return
}
}
}
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
} else if i.readonly {
return nil, ErrReadOnly
}
instance := i.getOrCreateInstance(instanceID)
err = instance.Push(ctx, req)
return &logproto.PushResponse{}, err
}
func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
inst, ok := i.getInstanceByID(instanceID)
if ok {
return inst
}
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst
}
pkg/ingester/flush.go中
func (i *Ingester) flushLoop(j int) {
defer func() {
level.Debug(util.Logger).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()
for {
o := i.flushQueues[j].Dequeue()
if o == nil {
return
}
op := o.(*flushOp)
level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp", op.fp, "immediate", op.immediate)
err := i.flushUserSeries(op.userID, op.fp, op.immediate)
if err != nil {
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
}
// If we're exiting & we failed to flush, put the failed operation
// back in the queue at a later point.
if op.immediate && err != nil {
op.from = op.from.Add(flushBackoff)
i.flushQueues[j].Enqueue(op)
}
}
}
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
}
chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
if len(chunks) < 1 {
return nil
}
ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, labels, chunks)
if err != nil {
return err
}
instance.streamsMtx.Lock()
for _, chunk := range chunks {
chunk.flushed = time.Now()
}
instance.streamsMtx.Unlock()
return nil
}
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
...
if err := i.store.Put(ctx, wireChunks); err != nil {
return err
}
...
return nil
}
pkg/ingester/instance.go中
func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
cfg: cfg,
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
factory: factory,
tailers: map[uint32]*tailer{},
limiter: limiter,
syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
}
func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
var appendErr error
for _, s := range req.Streams {
stream, err := i.getOrCreateStream(s)
if err != nil {
appendErr = err
continue
}
prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
appendErr = err
continue
}
memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}
return appendErr
}
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
...
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
...
}
pkg/ingester/stream.go中
func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
...
chunk := &s.chunks[len(s.chunks)-1]
...
if err := chunk.chunk.Append(&entries[i]); err != nil {
...
}
github.com/cortexproject/cortex/pkg/util/services/basic_service.go中
func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
return &BasicService{
startFn: start,
runningFn: run,
stoppingFn: stop,
state: New,
runningWaitersCh: make(chan struct{}),
terminatedWaitersCh: make(chan struct{}),
}
}
func (b *BasicService) StartAsync(parentContext context.Context) error {
switched, oldState := b.switchState(New, Starting, func() {
b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
b.notifyListeners(func(l Listener) { l.Starting() }, false)
go b.main()
})
if !switched {
return invalidServiceStateError(oldState, New)
}
return nil
}
网友评论