From 638f59fce938cb95d2d711cbc85d0cd62d7b14d3 Mon Sep 17 00:00:00 2001 From: benclive Date: Wed, 7 Aug 2024 14:06:28 +0100 Subject: [PATCH] chore: Remove unused stream index from RF1 ingester (#13758) --- pkg/ingester-rf1/instance.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/pkg/ingester-rf1/instance.go b/pkg/ingester-rf1/instance.go index 72f2f613a909..e05e99ba8b2f 100644 --- a/pkg/ingester-rf1/instance.go +++ b/pkg/ingester-rf1/instance.go @@ -18,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" - "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -61,7 +60,6 @@ type instance struct { buf []byte // buffer used to compute fps. streams *streamsMap - index *index.Multi mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free instanceID string @@ -140,10 +138,6 @@ func newInstance( customStreamsTracker push.UsageTracker, logger log.Logger, ) (*instance, error) { - invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) - if err != nil { - return nil, err - } streams := newStreamsMap() ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) c := config.SchemaConfig{Configs: periodConfigs} @@ -151,7 +145,6 @@ func newInstance( cfg: cfg, streams: streams, buf: make([]byte, 0, 1024), - index: invertedIndex, instanceID: instanceID, // streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), @@ -182,7 +175,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre // reducing the stream limits, for instance. var err error - labels, err := syntax.ParseLabels(pushReqStream.Labels) + sortedLabels, err := syntax.ParseLabels(pushReqStream.Labels) if err != nil { if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( @@ -196,12 +189,10 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre } if err != nil { - return i.onStreamCreationError(ctx, pushReqStream, err, labels) + return i.onStreamCreationError(ctx, pushReqStream, err, sortedLabels) } - fp := i.getHashForLabels(labels) - - sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) + fp := i.getHashForLabels(sortedLabels) chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream)) if err != nil {