From 00d3c7a52d9f2b48fccb0cd5b105a2577b3d0305 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 31 May 2024 08:26:37 -0400 Subject: [PATCH] feat: add profile tagging to ingester (#13068) Signed-off-by: Edward Welch Co-authored-by: poyzannur Co-authored-by: Poyzan <31743851+poyzannur@users.noreply.github.com> --- pkg/ingester/ingester.go | 408 ++++++++++++++++++++++----------------- 1 file changed, 229 insertions(+), 179 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6d27d349c93f..f23e1de227f6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -9,6 +9,7 @@ import ( "os" "path" "path/filepath" + "runtime/pprof" "sync" "time" @@ -837,7 +838,12 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro if err != nil { return &logproto.PushResponse{}, err } - return &logproto.PushResponse{}, instance.Push(ctx, req) + + pprof.Do(ctx, pprof.Labels("path", "write", "tenant", instanceID), func(c context.Context) { + err = instance.Push(ctx, req) + }) + + return &logproto.PushResponse{}, err } // GetStreamRates returns a response containing all streams and their current rate @@ -848,7 +854,11 @@ func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRe defer sp.LogKV("event", "ingester finished handling GetStreamRates") } - allRates := i.streamRateCalculator.Rates() + var allRates []logproto.StreamRate + pprof.Do(ctx, pprof.Labels("path", "write"), func(c context.Context) { + allRates = i.streamRateCalculator.Rates() + }) + rates := make([]*logproto.StreamRate, len(allRates)) for idx := range allRates { rates[idx] = &allRates[idx] @@ -902,39 +912,45 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie if err != nil { return err } - it, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) - if err != nil { - return err - } - if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { - storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ - Selector: req.Selector, - Direction: req.Direction, - Start: start, - End: end, - Limit: req.Limit, - Shards: req.Shards, - Deletes: req.Deletes, - Plan: req.Plan, - }} - storeItr, err := i.store.SelectLogs(ctx, storeReq) + pprof.Do(ctx, pprof.Labels("path", "read", "type", "log", "tenant", instanceID), func(c context.Context) { + var it iter.EntryIterator + it, err = instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) if err != nil { - util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return err + return } - it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction) - } - defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) + if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ + Selector: req.Selector, + Direction: req.Direction, + Start: start, + End: end, + Limit: req.Limit, + Shards: req.Shards, + Deletes: req.Deletes, + Plan: req.Plan, + }} + var storeItr iter.EntryIterator + storeItr, err = i.store.SelectLogs(ctx, storeReq) + if err != nil { + util.LogErrorWithContext(ctx, "closing iterator", it.Close) + return + } + it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction) + } - // sendBatches uses -1 to specify no limit. - batchLimit := int32(req.Limit) - if batchLimit == 0 { - batchLimit = -1 - } + defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendBatches(ctx, it, queryServer, batchLimit) + // sendBatches uses -1 to specify no limit. + batchLimit := int32(req.Limit) + if batchLimit == 0 { + batchLimit = -1 + } + err = sendBatches(ctx, it, queryServer, batchLimit) + }) + + return err } // QuerySample the ingesters for series from logs matching a set of matchers. @@ -965,35 +981,41 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log return err } - it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) - if err != nil { - return err - } - if sp != nil { - sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End) - } - - if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { - storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ - Start: start, - End: end, - Selector: req.Selector, - Shards: req.Shards, - Deletes: req.Deletes, - Plan: req.Plan, - }} - storeItr, err := i.store.SelectSamples(ctx, storeReq) + pprof.Do(ctx, pprof.Labels("path", "read", "type", "metric", "tenant", instanceID), func(c context.Context) { + var it iter.SampleIterator + it, err = instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) if err != nil { - util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return err + return + } + if sp != nil { + sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End) } - it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) - } + if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ + Start: start, + End: end, + Selector: req.Selector, + Shards: req.Shards, + Deletes: req.Deletes, + Plan: req.Plan, + }} + var storeItr iter.SampleIterator + storeItr, err = i.store.SelectSamples(ctx, storeReq) + if err != nil { + util.LogErrorWithContext(ctx, "closing iterator", it.Close) + return + } + + it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) + } + + defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) + err = sendSampleBatches(ctx, it, queryServer) + }) - return sendSampleBatches(ctx, it, queryServer) + return err } // asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`. @@ -1039,24 +1061,27 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq return nil, err } - // get chunk references - chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil) - if err != nil { - return nil, err - } + var resp logproto.GetChunkIDsResponse + pprof.Do(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID), func(c context.Context) { + // get chunk references + chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil) + if err != nil { + return + } - // todo (Callum) ingester should maybe store the whole schema config? - s := config.SchemaConfig{ - Configs: i.periodicConfigs, - } + // todo (Callum) ingester should maybe store the whole schema config? + s := config.SchemaConfig{ + Configs: i.periodicConfigs, + } - // build the response - resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}} - for _, chunks := range chunksGroups { - for _, chk := range chunks { - resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef)) + // build the response + resp = logproto.GetChunkIDsResponse{ChunkIDs: []string{}} + for _, chunks := range chunksGroups { + for _, chk := range chunks { + resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef)) + } } - } + }) return &resp, nil } @@ -1081,49 +1106,59 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp } } - resp, err := instance.Label(ctx, req, matchers...) - if err != nil { - return nil, err - } - - if req.Start == nil { - return resp, nil - } + var resp *logproto.LabelResponse + var storeValues []string + pprof.Do(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID), func(c context.Context) { + resp, err = instance.Label(ctx, req, matchers...) + if err != nil { + return + } + if req.Start == nil { + return + } - // Only continue if the active index type is one of async index store types or QueryStore flag is true. - asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() - if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { - return resp, nil - } + // Only continue if the active index type is one of async index store types or QueryStore flag is true. + asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() + if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { + return + } - var cs storage.Store - var ok bool - if cs, ok = i.store.(storage.Store); !ok { - return resp, nil - } + var cs storage.Store + var ok bool + if cs, ok = i.store.(storage.Store); !ok { + return + } - maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod - if asyncStoreMaxLookBack != 0 { - maxLookBackPeriod = asyncStoreMaxLookBack - } - // Adjust the start time based on QueryStoreMaxLookBackPeriod. - start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) - if start.After(*req.End) { - // The request is older than we are allowed to query the store, just return what we have. - return resp, nil - } - from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) - var storeValues []string - if req.Values { - storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) - if err != nil { - return nil, err + maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod + if asyncStoreMaxLookBack != 0 { + maxLookBackPeriod = asyncStoreMaxLookBack } - } else { - storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs") - if err != nil { - return nil, err + // Adjust the start time based on QueryStoreMaxLookBackPeriod. + start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) + if start.After(*req.End) { + // The request is older than we are allowed to query the store, just return what we have. + return } + from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + + if req.Values { + storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) + if err != nil { + return + } + } else { + storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs") + if err != nil { + return + } + } + }) + + // When wrapping the work above in the pprof.Do function we created a possible scenario where resp could + // be populated with values but an error occurred later on, prior to this profiling wrapper we would have + // always exited with a nil response and the error message, this is here to keep that behavior. + if err != nil { + return nil, err } return &logproto.LabelResponse{ @@ -1142,7 +1177,13 @@ func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo if err != nil { return nil, err } - return instance.Series(ctx, req) + + var series *logproto.SeriesResponse + pprof.Do(ctx, pprof.Labels("path", "read", "type", "series", "tenant", instanceID), func(c context.Context) { + series, err = instance.Series(ctx, req) + }) + + return series, err } func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { @@ -1163,43 +1204,47 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest return nil, err } - type f func() (*logproto.IndexStatsResponse, error) - jobs := []f{ - f(func() (*logproto.IndexStatsResponse, error) { - return instance.GetStats(ctx, req) - }), - f(func() (*logproto.IndexStatsResponse, error) { - return i.store.Stats(ctx, user, req.From, req.Through, matchers...) - }), - } - resps := make([]*logproto.IndexStatsResponse, len(jobs)) - - if err := concurrency.ForEachJob( - ctx, - len(jobs), - 2, - func(_ context.Context, idx int) error { - res, err := jobs[idx]() - resps[idx] = res - return err - }, - ); err != nil { - return nil, err - } + var merged logproto.IndexStatsResponse + pprof.Do(ctx, pprof.Labels("path", "read", "type", "stats", "tenant", user), func(c context.Context) { - merged := index_stats.MergeStats(resps...) - if sp != nil { - sp.LogKV( - "user", user, - "from", req.From.Time(), - "through", req.Through.Time(), - "matchers", syntax.MatchersString(matchers), - "streams", merged.Streams, - "chunks", merged.Chunks, - "bytes", merged.Bytes, - "entries", merged.Entries, - ) - } + type f func() (*logproto.IndexStatsResponse, error) + jobs := []f{ + f(func() (*logproto.IndexStatsResponse, error) { + return instance.GetStats(ctx, req) + }), + f(func() (*logproto.IndexStatsResponse, error) { + return i.store.Stats(ctx, user, req.From, req.Through, matchers...) + }), + } + resps := make([]*logproto.IndexStatsResponse, len(jobs)) + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + 2, + func(_ context.Context, idx int) error { + res, err := jobs[idx]() + resps[idx] = res + return err + }, + ); err != nil { + return + } + + merged = index_stats.MergeStats(resps...) + if sp != nil { + sp.LogKV( + "user", user, + "from", req.From.Time(), + "through", req.Through.Time(), + "matchers", syntax.MatchersString(matchers), + "streams", merged.Streams, + "chunks", merged.Chunks, + "bytes", merged.Bytes, + "entries", merged.Entries, + ) + } + }) return &merged, nil } @@ -1220,31 +1265,33 @@ func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) ( return nil, err } - type f func() (*logproto.VolumeResponse, error) - jobs := []f{ - f(func() (*logproto.VolumeResponse, error) { - return instance.GetVolume(ctx, req) - }), - f(func() (*logproto.VolumeResponse, error) { - return i.store.Volume(ctx, user, req.From, req.Through, req.Limit, req.TargetLabels, req.AggregateBy, matchers...) - }), - } - resps := make([]*logproto.VolumeResponse, len(jobs)) - - if err := concurrency.ForEachJob( - ctx, - len(jobs), - 2, - func(_ context.Context, idx int) error { - res, err := jobs[idx]() - resps[idx] = res - return err - }, - ); err != nil { - return nil, err - } - - merged := seriesvolume.Merge(resps, req.Limit) + var merged *logproto.VolumeResponse + pprof.Do(ctx, pprof.Labels("path", "read", "type", "volume", "tenant", user), func(c context.Context) { + type f func() (*logproto.VolumeResponse, error) + jobs := []f{ + f(func() (*logproto.VolumeResponse, error) { + return instance.GetVolume(ctx, req) + }), + f(func() (*logproto.VolumeResponse, error) { + return i.store.Volume(ctx, user, req.From, req.Through, req.Limit, req.TargetLabels, req.AggregateBy, matchers...) + }), + } + resps := make([]*logproto.VolumeResponse, len(jobs)) + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + 2, + func(_ context.Context, idx int) error { + res, err := jobs[idx]() + resps[idx] = res + return err + }, + ); err != nil { + return + } + merged = seriesvolume.Merge(resps, req.Limit) + }) return merged, nil } @@ -1401,19 +1448,22 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected } } - labelMap, err := instance.LabelsWithValues(ctx, req.Start, matchers...) + var result map[string]*logproto.UniqueLabelValues + pprof.Do(ctx, pprof.Labels("path", "read", "type", "detectedLabels", "tenant", userID), func(c context.Context) { + labelMap, err := instance.LabelsWithValues(ctx, req.Start, matchers...) + if err != nil { + return + } + result = make(map[string]*logproto.UniqueLabelValues) + for label, values := range labelMap { + var uniqueValues []string + for v := range values { + uniqueValues = append(uniqueValues, v) + } - if err != nil { - return nil, err - } - result := make(map[string]*logproto.UniqueLabelValues) - for label, values := range labelMap { - var uniqueValues []string - for v := range values { - uniqueValues = append(uniqueValues, v) + result[label] = &logproto.UniqueLabelValues{Values: uniqueValues} } + }) - result[label] = &logproto.UniqueLabelValues{Values: uniqueValues} - } return &logproto.LabelToValuesResponse{Labels: result}, nil }